Description
At the moment, the only way of contacting a task instance in a Samza job is to send a message to one of the job's input streams. Sometimes it would be useful to be able to contact a task instance directly, as a TCP network service. Example use cases:
- Allowing a remote client to query a Samza job's state store (which may, in some cases, obviate the need for writing job output into a separate, publicly readable database).
- Allowing a client to "tap into" a stream without consuming the entire stream. For example, a client may wish to be notified about all log messages matching a particular regular expression as long as they are connected (the notifications stop when they disconnect).
- Performing an expensive on-demand computation quickly by parallelizing it across many tasks in one or more Samza jobs, like in Storm's distributed RPC.
These use cases can be implemented by running a network server (e.g. a HTTP server, a WebSocket server, a Thrift RPC server, etc) in each Samza container. We then need to solve two problems:
- How is a client request routed to the right container? (Assuming Samza is running in a YARN cluster, we have no way of knowing the IP address and port number of a container a priori. Also, the endpoint may change as containers fail and are restarted.)
- How are requests from remote clients integrated with Samza's StreamTask programming model? (Requests may arrive at any time; responses may be delivered immediately or asynchronously at some later time; depending on the protocol, there may be a stream of requests and responses on a single client connection.)
Point (1) is a service discovery problem, for which there are many possible solutions, but no one obvious winner. Helix could be used for this (although Helix also provides various cluster management features that we probably wouldn't need). YARN is considering integrating a service discovery mechanism (YARN-913). Finagle ServerSets and Rest.li D2 both use Zookeeper for service discovery. Looking beyond the end of our JVM-based nose, projects like Consul, SkyDNS and etcd also provide service discovery. It would be worth surveying the landscape and figuring out the various pros and cons before settling on one particular service discovery mechanism. In particular, we should keep in mind the needs of clients that are not written in a JVM-based language.
Whatever service discovery solution is chosen, we will need to decide whether to use a separate TCP port for each TaskInstance within a container, or whether to use some application-level mechanism for deciding which TaskInstance should process a particular incoming request.
For (2) I propose the following: a network service is implemented as just another system (i.e. implementing SystemFactory) in Samza. Every incoming request from a client is wrapped in an IncomingMessageEnvelope, and goes through the same MessageChooser flow as everything else. This ensures that StreamTask remains single-threaded and easy to reason about.
Each connection from a client is given a unique stream name. This allows the StreamTask to tell which requests came from the same client. In order to send a response to a client, the StreamTask sends an OutgoingMessageEnvelope to an output stream, using the same system and stream name as the incoming request. This means that a StreamTask can generate a response immediately if it wants to, or it could send a response asynchronously at some later point in time. It also works for protocols that can have many requests and responses on a long-lived connection, e.g. WebSocket.
Special incoming messages can be used to indicate that a client has connected or disconnected (allowing cleanup of any per-client information in the StreamTask), and a special outgoing message can be used to tell the network service to disconnect a particular client.
If a container is shut down, all of its clients will be disconnected and will need to reconnect. I don't think we need to worry about preserving connections across container restarts.