Description
Samza's existing config setup is problematic for a number of reasons:
- It's completely immutable once a job starts. This prevents any dynamic reconfiguration and auto-scaling. It is debatable whether we want these feature or not, but our existing implementation actively prevents it. See SAMZA-334 for discussion.
- We pass existing configuration through environment variables. YARN exports environment variables in a shell script, which limits the size to the varargs length on the machine. This is usually ~128KB. See SAMZA-333 and
SAMZA-337for details. - User-defined configuration (the Config object) and programmatic configuration (checkpoints and TaskName:State mappings (see
SAMZA-123)) are handled differently. It's debatable whether this makes sense.
In SAMZA-123, jghoman and I propose implementing a ConfigLog. This log would replace both the checkpoint topic and the existing config environment variables in SamzaContainer and Samza's YARN AM.
I'd like to keep this ticket's scope limited to just the implementation of the ConfigLog, and not re-designing how Samza's config is used in the code (SAMZA-40). We should, however, discuss how this feature would affect dynamic reconfiguration/auto-scaling.
Attachments
Attachments
- DESIGN-SAMZA-348-0.md
- 30 kB
- Chris Riccomini
- DESIGN-SAMZA-348-0.pdf
- 220 kB
- Chris Riccomini
- DESIGN-SAMZA-348-1.md
- 45 kB
- Chris Riccomini
- DESIGN-SAMZA-348-1.pdf
- 304 kB
- Chris Riccomini
Issue Links
- incorporates
-
SAMZA-798 Performance and stability issue after combining checkpoint and coordinator stream
- Resolved
- is related to
-
SAMZA-40 Refactor Samza configuration
- Open
-
SAMZA-42 Add a job setup phase to Samza
- Open
-
SAMZA-333 Large samza configurations results in yarn job failure
- Open
-
SAMZA-374 Need to be able to change SSP Grouper
- Resolved
-
SAMZA-406 Hot standby containers
- Open
- supercedes
-
SAMZA-237 Consider implementing job control topic to support dynamic inputs, capacity changes, etc.
- Resolved
1.
|
Pass config via HTTP | Resolved | Chris Riccomini | |
2.
|
Provide a Samza job data model for job coordinator | Resolved | Chris Riccomini | |
3.
|
Pass config from JobRunner to JobCoordinator via ConfigStream | Resolved | Chris Riccomini | |
4.
|
Use coordinator stream and eliminate CheckpointManager | Resolved | Naveen Somasundaram | |
5.
|
Document coordinator stream | Resolved | Navina Ramesh | |
6.
|
Migrate checkpoint from checkpoint topic to Coordinator stream | Resolved | Yi Pan | |
7.
|
Disable checkpoint in coordinator stream | Resolved | Yi Pan | |
8.
|
Integrate CoordinatorStream to use SystemConsumers and SystemProducers | Open | Unassigned | |
9.
|
Optimize CoordinatorStream's bootstrap mechanism | Open | Unassigned | |
10.
|
Refactor Coordinator stream messages | Resolved | József Márton Jung | |
11.
|
Explicit restart containers to pick up dynamic JobModel changes | Open | Alex Buck |
Activity
Just for my education : are we talking about embedding a Kafka consumer inside the YARN AM ?
The way I imagined this is that the AM will read the config from some Kafka topic (per Samza topology) which will then initialize (or modify) containers based on this ?
Do we also need the AM -> container communication mechanism, discussed in a previous ticket as part of this ? OR - in case of a config change, the AM simply destroys and re-creates the required containers ?
are we talking about embedding a Kafka consumer inside the YARN AM
Yes, though in a round about way. We'd probably make the interface pluggable, and then provide a YARN-based implementation. I don't think we've thought about this in great detail yet, but the short answer is that there could end up being a Kafka consumer inside the YARN AM process.
The way I imagined this is that the AM will read the config from some Kafka topic (per Samza topology) which will then initialize (or modify) containers based on this ?
If we follow this strategy, then we need to provide some RPC mechanism for each SamzaContainer to call back and get the config for itself. This could be done via HTTP, and I believe we already expose configs over HTTP in the YARN AM, so it should be pretty easy to add as a JSON blob.
An alternative strategy would be to have each SamzaContainer fully read the ConfigLog topic, and not coordinate directly with the AM.
One decision that needs to be made is whether the centralized coordinator should be the AM, or whether the SamzaContainers themselves should have some influence on restart decisions. It seems to me, at first glance, than having the containers call back to the AM to get their config, and having the AM be in complete control over when containers are restarted is the most desirable way to do things since it means the containers can be completely dumb, and we have a single place to make all container-related decisions (the AM).
Do we also need the AM -> container communication mechanism, discussed in a previous ticket as part of this ? OR - in case of a config change, the AM simply destroys and re-creates the required containers ?
The simplest approach is to not have AM-to-container communication, and simply have the AM restart the containers when it needs to change them. The trade-off is that if your container has state, and you change config/restart the container, then you have to wait for the container to restore its state.
have each SamzaContainer fully read the ConfigLog topic, and not coordinate directly with the AM ... One decision that needs to be made is whether the centralized coordinator should be the AM
I think doing the config change through AM (and not bypassing it) seems better. Given that AM has more visibility (as you pointed out) - we can then leverage this to make more informed / intelligent decisions regarding container restarts.
The simplest approach is to not have AM-to-container communication
Agreed - I think simple restarts would just work.
I'm attaching a preliminary design doc. There are still some TODOs in the implementation section, but the rest of the document is pretty in-depth.
The goal is to get feedback.
It might be a good idea to have this as an interface named "ConfigState" (or something like that) instead of "ConfigStream", unless there's a specific feature of this that can only exist when it's backed by something similiar to a Kafka stream (instead of Zookeeper for example).
Along that line of thought, does this avoid using Zookeeper to avoid a new dependency? Or for some other reason?
The YARN AM exposes the Config object's data via an HTTP JSON webapp.
Is the intent for this API to allow changing the config without restarting the AM? If so, this might be a nice way to manage several Samza jobs with an external system (perhaps one with a nice web UI, like a Mesos meta-scheduler).
This refactoring would also help make Mesos support easier.
I definitely agree.
It might be a good idea to have this as an interface named "ConfigState" (or something like that) instead of "ConfigStream", unless there's a specific feature of this that can only exist when it's backed by something similiar to a Kafka stream (instead of Zookeeper for example).
We started by referring to this as state, but there was strong feedback from martinkl not to use the word "state" again, since it's already overloaded, and used for Samza's state management feature. The other idea was "ConfigLog".
Along that line of thought, does this avoid using Zookeeper to avoid a new dependency? Or for some other reason?
The two reasons for not using ZK are:
- Avoid adding a new dependency. Thus far, we've gotten by without it.
- If we use Kafka as the system, it lets us transactionally store offset checkpoints in the ConfigStream. If we have two systems (ZK, and Kafka), then we can no longer transactionally commit everything at once (offset checkpoint, output, state changelog, etc).
Regarding (2), it looks more and more like it will be a requirement for transactionality to have the ConfigStream and a job's output streams go to the same underlying system. It's not necessarily the case that only Kafka can support this. I believe HBase would work as well.
Is the intent for this API to allow changing the config without restarting the AM?
Yes, this is the intent. In its most naive form, the flow looks something like:
- AM updates config that's exposed via HTTP JSON API.
- AM kills all existing containers.
- AM brings up new containers.
- New containers query AM's HTTP JSON API, which has new configs.
If so, this might be a nice way to manage several Samza jobs with an external system (perhaps one with a nice web UI, like a Mesos meta-scheduler).
I hadn't considered this in detail, but it seems like it might work. I think as long as the HTTP JSON API is well defined, I don't the an individual SamzaContainer should care about whether the API it's querying is just managing one job, or many jobs.
This looks pretty good . +1
The HTTP JSON interface with a pull model sounds good to me ! This also makes it easy for the user to see what config is actually being used (a common problem in distributed systems).
My comments on the open questions:
- Multi writer problem: I think we can make the auto-config by the Samza AM - as a tunable property. This should be used when the user does not want to keep tuning the config. In addition, maybe it is better for the user to make any config related changes from a web based endpoint (maybe hosted in the AM). This way, the config hosted by the AM becomes the source of truth and not cfg2 (something similar to what Azkaban also does).
- Config stream naming:
Maybe we can still standardize this. The configure-job.sh script can take a job name for which a config stream is to be written. We can simply wait for the Kafka topic deletion to be available - to solve the problem of resetting the config.
Maybe we can still standardize this. The configure-job.sh script can take a job name for which a config stream is to be written. We can simply wait for the Kafka topic deletion to be available - to solve the problem of resetting the config.
Yea, I think I'm leaning this way now as well. Something like:
$ configure-job.sh --location kafka://localhost:10251 --job.name <job name> --job.id <job id> --property task.inputs=kafka.foo $ run-job.sh --location kafka://localhost:10251 --job.name <job name> --job.id <job id>
I think we can make the auto-config by the Samza AM - as a tunable property. This should be used when the user does not want to keep tuning the config.
I was just thinking that we could ignore this problem. As long as we keep the job coordinator as the only thing that programmatically mutates configuration, then the race condition is between the coordinator and the human dev. I think this should be fine.
The offset checkpoint messages pose a bigger problem, though. Since they're checkpointed once per minute, the chance of an over-write is relatively high in the case where a developer is trying to set all offsets back to 0, for example. If we suppose there are 64 StreamTasks, and each one is checkpointing once perm minute, then you're getting 1 checkpoint per second, on average. If the developer writes all 64 offset messages back to offset 0, it seems likely that a SamzaContainer might over-write the developer's offset message with its own before the container is killed off and restarted. The order of events would be:
SamzaContainer: offset 3976 SamzaContainer: offset 4320 Developer: offset 0 SamzaContainer: offset 5320 Job coordinator: restarts SamzaContainer SamzaContainer: starts with offset 5320
I don't have a good idea on how to solve this. One way would be to add some generation number that invalidates all future offsets from SamzaContainer after the developer's message is written. I haven't thought about this in great detail.
In addition, maybe it is better for the user to make any config related changes from a web based endpoint (maybe hosted in the AM). This way, the config hosted by the AM becomes the source of truth and not cfg2 (something similar to what Azkaban also does).
I was thinking the source of truth would be the underlying stream, since this is what the job coordinator will use to run the job. Whether the config is mutated from the AM web UI, or from a CLI, I haven't considered very much.
Maybe you're trying to get at the idea that we could try and funnel all mutations to the ConfigStrem through a single writer?
I was thinking the source of truth would be the underlying stream, since this is what the job coordinator will use to run the job. Whether the config is mutated from the AM web UI, or from a CLI, I haven't considered very much.Maybe you're trying to get at the idea that we could try and funnel all mutations to the ConfigStrem through a single writer?
The source of truth is the stream. However, in case a lot of modifications are done (either manually or automatically), the user might lose track of what the exact config is. So yeah, funneling all mutations through a single writer (like the AM) might add value - so that :
- We can reflect the current config accurately (for example - if within LinkedIn, the user only modifies the config via cfg2, then there's an extra overhead of keeping that in sync with the actual config - since config mutations might be done via the AM).
- Avoid all concurrency issues.
the user might lose track of what the exact config is
For this, I was thinking that configure-job.sh could have a --read switch, to get all existing configs for a job. I agree it's super useful to have the AM expose them as well, which we can continue to do.
Avoid all concurrency issues.
Isn't there still a concurrency issue if two writers update the AM UI at the same time?
We can reflect the current config accurately (for example - if within LinkedIn, the user only modifies the config via cfg2, then there's an extra overhead of keeping that in sync with the actual config - since config mutations might be done via the AM).
For this kind of use case, I was figuring we'd have configure-job.sh behave a lot like run-job.sh does today: take a URI and a factory, and resolve configs. For example, something like:
$ configure-job.sh --uri kafa://localhost:1025 --job.name foo --job.id bar --config-file=file://... --config-factory=PropertiesConfigFactory
You could have configure-job.sh run against a static config file every time run-job.sh is run. This would essentially mirror how Samza currently works.
One other thought: if we depend on a UI (in YARN or otherwise), we get into a problem where we might need to edit config while the job is down (the UI is unavailable).
I haven't really fully baked any of this, but this is just along the lines of what I'm thinking right now. I think it's OK to live with concurrency issues for config, but for offsets, it could be problematic. I haven't spent much time thinking about how to fix that yet.
Here are my thoughts so far -
At the high level, I agree that all communication from the AM to the containers should happen through the AM API. All the communication from containers to the AM should happen through the stream (at least for now till we can think of certain soft states that the containers would like to communicate with the AM that need not be persisted). Multiple containers/tasks can write to this stream and any race conditions that can occur due to multiple writers is present irrespective of the underlying storage medium.
1. AM catch up speed of config log
We plan to use the config log as the source of truth and use it in the future to make dynamic updates to configs. If we want the AM to take actions on the cluster immediately after the config updates, it is important the AM is always caught up with the config log. It would be useful to do a rough math on the potential size that this log can grow assuming a worst case scenario for cluster setup. We would turn on key deduplication in Kafka which would ensure this log size is bounded but having some estimate would be useful.
2. AM startup time
This is kind of related to the previous point. On startup, the AM needs to read the entire config log from head and restore the job configuration and offset state. This could potentially add more time to the startup. Today, the offsets are individually restored by the tasks and if they have multiple partitions, the restoration can potentially happen faster. This largely again depends on how big the stream can grow.
3. AM failure and containers running
I am assuming that in the future we would like to have the containers running even when the AM fails. The containers would continue to write their offsets into the config log. However, if the container fails when the AM is down, they would not be able to start since they cannot get the offsets from the AM. If the AM is highly available, we can safely assume that the new AM will be chosen within few seconds (AM start + config log restore) and the containers can proceed.
4. Mixing transactional and non transactional updates
It would be worth mentioning that by writing the offsets and the job configurations into the same task, we would potentially mix transactions and non transactional messages into the same topic. The transactional feature is required to ensure exactly once semantics in Samza. The AM would need to use a transactionally aware consumer to ensure it reads the data in a consistent state.
5. Dynamic config updates
If we let dynamic config updates to happen, we would need some kind of boundaries to declare when the AM can take actions on the config changes. For example, you may want to change the max memory size and the total number of containers and we would like the AM to react to these changes once. One option would be to batch changes in the AM for some interval before acting on it. Another option would be to introduce the notion of batch config change messages. We would simply add a batch config message start header followed by all the config changes and then push a batch config message end header when the AM knows that it needs to act on the change.
It would be useful to do a rough math on the potential size that this log can grow assuming a worst case scenario for cluster setup.
Agreed. Other math that we should do is how many containers we can have running if they're polling the job coordinator's HTTP server with a frequency of N seconds.
However, if the container fails when the AM is down, they would not be able to start since they cannot get the offsets from the AM.
True. I've been thinking about this a bit more. I think this should be fine. In both Mesos and YARN, if the container fails, it actually won't be restarted anyway (since restarting the container requires a job coordinator to decide which partitions are assigned, which box the container should be on, etc). Even if the distributed execution framework were to restart the container, I think the desired behavior is to just block until the AM comes back, or to kill itself permanently, and wait for the AM to come back and restart it properly.
The AM would need to use a transactionally aware consumer to ensure it reads the data in a consistent state.
Good point. Related to this, if we implement the protocol in the ConfigStream as key-value (vs. an entire config blob as one value), then you might wish to use transactions to atomically write multiple key-value pairs together (all-or-nothing) into the ConfigStream. Again, this would require a transactional consumer.
I'll update the design docs with this feedback.
Regarding the client commands to create and modify Samza jobs (such as configure-job and run-job), it may be useful to review existing commands that perform a similar role:
- My personal favorite is Slurm's set of client commands, of which sbatch is probably the most relevant (http://www.schedmd.com/slurmdocs/sbatch.html).
- To go back a bit further in history, it might be a good idea to take a look at the POSIX qsub style command from PBS/Torque (http://docs.adaptivecomputing.com/torque/4-1-4/Content/topics/commands/qsub.htm). Moab's msub also follows this design.
Regarding a possible DSL for building configuration, it may be useful to look at Slurm's lua callback for job configuration (warning, GPLv2 code) https://github.com/SchedMD/slurm/blob/master/contribs/lua/job_submit.lua
Another idea would be to implement a configuration DSL using a scripting language like Python, which is both easy to implement and also allows you to embed Python code in your configuration script.
I prefer to have a declarative DSL such as the one used by Google's build system, Blaze. I found a more detailed example can be found in this GitHub Gist.
This would not be difficult to implement since these statements are simply Python function calls and since the DSL is valid Python code, it is also possible to have regular Python code in your configuration script. This way, after each statement is evaluated, the Samza client program can either compile it into JProperties (as a stop-gap solution) or turn it into a Kafka message and publish it to the configuration stream.
Of course, we can have both a command line program and a DSL, and I am pretty sure that as Samza takes off, people would want to start writing DSLs and clients for other languages as well. The key would be to make sure that the common interface the DSLs and tools talk to is solid.
I have opened SAMZA-416 to discuss the DSL further.
I've updated the design document.
- Updated job coordinator section.
- Added `control-job.sh` CLI examples.
- Added ConfigStream implementation and protocol sections.
- Added scalability estimations for configuration size, write throughput, and container heartbeats.
- Added multi-writer race condition and transactionality design section.
- Added Mesos impact section.
- Added YARN AM work-preserving restart impact section.
- Eliminated 'Open Questions' section.
The largest change was in the 'Design Proposal' section, which became much more detailed and complete. If you only have time to read one section, make it the 'Design Proposal' pages.
Some open questions with this proposal:
- ConfigStream seems like a bad name now. We're over loading the stream to do a lot more than just config.
- The existing proposal depends on the transactionality of the underlying SystemConsumer/SystemProducer. Without transactionality, offset checkpoints are no longer atomic. In Samza's existing implementation, even without transactionality, checkpoints are done atomically, per-task.
- How to handle defaults. One way would be to have the job coordinator dump all of its defaults into the ConfigStream the first time it runs. The other way would be to have the control-job.sh just show defaults as part of --help.
1. For the name, does JobState make sense? The work of this component would be to track job-level state (job config, offsets for all tasks in the job, partition mappings for all tasks in the job).
does JobState make sense
It does, but the word "state" is already a way-overloaded term with Samza. We're trying to avoid using it in more places.
This is a nice proposal, I like it a lot. Various thoughts on the design doc (first three are responses to your questions above):
- I think the name "ConfigStream" is actually ok. A checkpoint or task-to-changelog-partition assignment can quite reasonably be regarded as configuration (even though it arises as a side-effect of execution rather than from a config file). Maybe "metadata" rather than "config", but "config" is shorter.
- You say the atomicity of per-task checkpointing is broken, but I don't follow why. Isn't a checkpoint update still one message written to the ConfigStream?
- I think defaults should not be written to the stream, but rather filled in at runtime at the last possible moment when the configuration is read. If they were written to the stream, it would become impossible for the framework to change defaults in future (as the job cannot distinguish between a default written to the stream, which should be replaced by a new default, and an explicitly configured parameter, which should be honoured).
- Do I understand correctly that from a container's point of view, the config remains immutable once it has been fetched from the AM's HTTP endpoint? The container does not consume the ConfigStream, and there is no polling of the HTTP endpoint (except for hot standby as discussed in the doc)? I think that's good, because a mutable config within a container would require big changes to SystemProducers/Consumers etc.
- Were you imagining that every job would have their own ConfigStream, or could a single ConfigStream be shared by multiple jobs? (I think each having their own would be simpler and nicer)
- Should the config stream location URL include the job name? An early example in the design doc (kafka://localhost:10251/my-job) includes the job name, later ones do not.
- What's the difference between control-job.sh and configure-job.sh?
- +1 on explicit restart.
- Not wild on the proposed serialisation format for config messages (t=... k=... v=...). What about escaping spaces and equals signs within values? Better to just use JSON, IMHO. (If using JSON in the key of a message sent to Kafka, need to ensure there is a deterministic order of keys, so that compaction works.)
- Moving ConfigRewriter into the coordinator: interesting idea, not sure about the implications. What are the use cases for ConfigRewriter, besides expanding a regex for specifying input streams? e.g. it might be used for fetching config from an external configuration management system – in that case, moving ConfigRewriter may be intrusive, as the coordinator may not be able to access that external system.
- Regarding estimate of time to consume the ConfigStream, I should point out that "control-job.sh --list" will take 100 seconds too, which is not so great since it's an interactive command. However, most jobs will have vastly smaller config, so perhaps that's an edge case we can live with.
- "If 1000 Samza jobs were run in one Samza grid, 50 Kafka brokers would be required just to sustain the ConfigStream write rate" — you're talking 1000 jobs with 1000 containers each here, i.e. 1 million CPU cores. I hope we get to see Samza running at that scale one day
- The prospect of a work-preserving AM restart is nice.
- Will ConfigStream be a general-purpose mechanism for parts of the framework which need to remember some information across job restarts? The assignment of tasks to changelog partitions is one example of such a thing that needs to be durable, and I could imagine there might be more. For example, a MySQL binlog SystemConsumer may want to remember metadata about leader failover events in some durable location. This is not a concrete need yet, just an idea to keep in mind.
- How does the ConfigStream checkpointing interact with Kafka's own consumer offset management? Is the intention that Samza will eventually switch over to Kafka's offset management, or will Samza keep doing its own checkpointing indefinitely?
Maybe "metadata" rather than "config", but "config" is shorter.
Hmm. MetadataStream. That might be technically more accurate and descriptive. I agree "config" might be construed as technically correct, but I'm a little worried that it's not really what people think of when they hear the word "config".
You say the atomicity of per-task checkpointing is broken, but I don't follow why. Isn't a checkpoint update still one message written to the ConfigStream?
Currently, when a StreamTask's offsets are committed, they're sent as a single message with Kafka. In the new proposal, they would be sent as a series of messages: one for each input SSP. I suppose "non-transactional" might be a bet more correct. The difference is that a container can fail half-way through a single StreamTask's offset commit in the new proposal, but not in our current implementation.
I think defaults should not be written to the stream, but rather filled in at runtime at the last possible moment when the configuration is read.
This needs to be thought through a bit, I think. The main concern was that the control-job.sh script might be a different version than what the job is running. In such a case, you (the developer) might wish to know what value is being used for config "X", but the default of the Samza version that control-job.sh is running might be different from the version that the job is running on.
Perhaps a simple query of the AM's HTTP JSON web service would be a better solution, though this would require the job being up and running to fetch config defaults.
Do I understand correctly that from a container's point of view, the config remains immutable once it has been fetched from the AM's HTTP endpoint? The container does not consume the ConfigStream, and there is no polling of the HTTP endpoint (except for hot standby as discussed in the doc)?
Correct.
Were you imagining that every job would have their own ConfigStream, or could a single ConfigStream be shared by multiple jobs? (I think each having their own would be simpler and nicer)
I had a note about this in one of my drafts, but I deleted it. The current proposal is one ConfigStream per-job. I couldn't come up with a good reason to have multiple jobs share a ConfigStream.
Should the config stream location URL include the job name? An early example in the design doc (kafka://localhost:10251/my-job) includes the job name, later ones do not.
I went back and forth on this. The trade-offs between the two approaches (at least, as I see it) are listed in the proposal. I opted for the simpler (from a dev's perspective) approach in the latest proposal.
What's the difference between control-job.sh and configure-job.sh?
They are the same thing. I started calling it configure-job.sh, but I've switched to calling it control-job.sh, since it might do things like restart the job, etc. Looks like I missed a few renames in the latest design doc. Everything should read control-job.sh.
Not wild on the proposed serialisation format for config messages (t=... k=... v=...). What about escaping spaces and equals signs within values? Better to just use JSON, IMHO. (If using JSON in the key of a message sent to Kafka, need to ensure there is a deterministic order of keys, so that compaction works.)
Oops, maybe I wasn't explicit enough. The current proposal is to use JSON for both keys and values. I think that I just used shorthand notation at various points in the docs. Agree we'll need to define an ordering for the keys, which is a bit odd/error prone. I'm not sure of a good way around this.
Regarding estimate of time to consume the ConfigStream, I should point out that "control-job.sh --list" will take 100 seconds too, which is not so great since it's an interactive command. However, most jobs will have vastly smaller config, so perhaps that's an edge case we can live with.
Yea, it's kind of annoying, but I can live with it, I think. This estimate is pretty pessimistic, though, in terms of job size. One other short-circuit that we could employ would be to have the control-job.sh script start by trying to query the AM's HTTP JSON server (though this would require the control-job.sh script to somehow know where the HTTP:PORT is).
Will ConfigStream be a general-purpose mechanism for parts of the framework which need to remember some information across job restarts?
Hmm. I hadn't thought about this in great detail. It seems like it'd be useful to provide a "write config" facility to pluggable parts of the framework, like SystemConsumers, as you've said. Given a good implementation of the ConfigStream, it seems possible to expose it.
How does the ConfigStream checkpointing interact with Kafka's own consumer offset management? Is the intention that Samza will eventually switch over to Kafka's offset management, or will Samza keep doing its own checkpointing indefinitely?
The current porposal is to stay off of Kafka's offset management, and just use our own. Technically, we'll still have to interact with Kafka's, since that's where transactions are managed, but we won't store any offsets in it--we'll just tell it to commit transactions. I haven't thought about this in great depth, but my gut reasoning is that it's best to stay away from dependency directly on Kafka for things like offset checkpoints, especially if there's the possible for non-Kafka offsets needing to be checkpointed. It could also cause another bifurcation between the way offsets are stored, and the way other config is stored.
In the new proposal, they would be sent as a series of messages: one for each input SSP.
Ah yes, I missed that. My hunch is that it's not a big problem, since state changelogs are currently also not atomically tied to the checkpoints either, so the semantics of container restart are somewhat vague already. And we generally don't give any guarantee regarding cross-SSP coordination. Once we get Kafka transactions, these things can all be tied together atomically.
Alternatively, would it be an option to continue writing checkpoints in the current form (per-StreamTask rather than per-SSP)? I don't see why the change to a MetadataStream forces us to use per-SSP checkpoint messages.
The main concern was that the control-job.sh script might be a different version than what the job is running. In such a case, you (the developer) might wish to know what value is being used for config "X", but the default of the Samza version that control-job.sh is running might be different from the version that the job is running on.
I think it's quite reasonable to expect that if you want to know the 'interpreted' value of a config property (after default is filled in, and perhaps also after it is parsed or otherwise processed) then you need to ask the AM, either via the web interface or via a command-line tool that talks to an API. Then control-job.sh is concerned only with the config that is explicitly declared, and not what is inferred.
If you want control-job.sh to also include docs (--help), you have the same version mismatch problem: if the job and the command-line tool are running different versions of Samza, the meaning of some of the properties may have changed.
The current proposal is to use JSON for both keys and values. I think that I just used shorthand notation at various points in the docs. Agree we'll need to define an ordering for the keys, which is a bit odd/error prone. I'm not sure of a good way around this.
Ok. An alternative would be to use a serialization format with a canonical representation (most binary serialization formats would qualify: Avro, Protocol Buffers, ASN.1 DER, etc), but that makes it less convenient if you want to inspect the stream using kafka-console-consumer. I don't have a strong opinion either way.
I haven't thought about this in great depth, but my gut reasoning is that it's best to stay away from dependency directly on Kafka for things like offset checkpoints, especially if there's the possible for non-Kafka offsets needing to be checkpointed.
Agree on the gut reasoning, though I haven't thought about it in great depth.
Ok. An alternative would be to use a serialization format with a canonical representation (most binary serialization formats would qualify: Avro, Protocol Buffers, ASN.1 DER, etc), but that makes it less convenient if you want to inspect the stream using kafka-console-consumer. I don't have a strong opinion either way.
It's REALLY handy for developers to be able to inspect/debug using the kafka-console-consumer.sh. Maybe you can use this annotation to force ordering?
http://jackson.codehaus.org/1.6.5/javadoc/org/codehaus/jackson/annotate/JsonPropertyOrder.html
I haven't been following along this discussion in detail, so this may be a non-issue. However, it might be a good idea to consider Amazon AWS Kinesis (very similar to Kafka) in this discussion. Of note is that Kinesis has a 50KB message size limit.
Known remaining issues with the proposed design:
- Message payload format K:V, vs. K: {K:V, ...}
- How does the control-job.sh script use the SystemConsumer/SystemProducer?
- How will this work in a dev environment?
I will address these in order.
Message payload format
The current design models ConfigStream messages as a simple key-value pair. The downside to this approach is that it breaks atomicity for a StreamTask's checkpoint (multiple messages are required for a single checkpoint--one per SSP:offset pair).
The two solutions to this are to (1) depend on transactionality, or (2) support a message payload format that is nested K:
{K:V, ...}. All offset checkpoints for a single task could therefore be written in a single message, thus maintaining atomic commits for all checkpoints within a single task. The latter approach (nested payloads) is how we currently checkpoint. The downsides to this approach are:
- The single offset checkpoint message will be much larger than any individual offset checkpoint message in approach (1).
- Modifying an offset checkpoint requires the job coordinator to do a read-modify-write, which is more complicated than the simple put that would be required for approach (1).
- It muddles the data model a little bit.
The problem with (1) is mainly that it depends on transactionality. Without this, there's the potential for a failure to occur halfway through a task checkpoint. In such a case, some input streams would fall back, and others would not. I tend to agree with Martin's assessment of the problem:
My hunch is that it's not a big problem, since state changelogs are currently also not atomically tied to the checkpoints either, so the semantics of container restart are somewhat vague already.
But given that it should be fairly trivial to solve this using nested payloads, we might as well do so. We can always clean it up later, if transactionality becomes commonplace.
How does the control-job.sh script use the SystemConsumer/SystemProducer?
This is a tricky one. Given that Samza has a SystemConsumer/SystemProducer API, it seems ideal to have the ConfigStream implementation use these interfaces for reading/writing config. In the design document, I glossed over how the job coordinator and control-job.sh script know how to translate a URI to a Config for SystemConsumer/SystemProducer. This is a bit of a chicken and egg problem. The control-job.sh script needs to know how to write to the ConfigStream, but in order to do that, it needs config for the SystemFactory.getConsumer() call.
Two potential solutions that I can think of are:
- Introduce a SAMZA_HOME environment variable, which expects a conf/samza-site.properties configuration.
- Add a SystemFactory.getConfig(URI uri) interface.
Introducing a SAMZA_HOME environment variable seems very heavy handed. It's going to have to be set on every node in the YARN cluster (since the job coordinator could run on any node), as well as the machine that control-job.sh is going to run on. This will be hard to operate, may be (Samza) version dependent, and seems kind of clunky.
Adding a getConfig() API seems mildly hacky. The main problem with this approach is how to determine which SystemFactory to use based on the URI. We could do something as simple as Class.forName(uri.getScheme() + "SystemFactory").newInstance(). This seems a bit hacky and dangerous, but should work, and maintains pluggability.
Does anyone else have any other ideas for this?
How will this work in a dev environment?
It's relatively easy to start a Samza job locally using the ThreadJobFactory or ProcessJobFactory right now. Config can be inserted via the constructor, and no Kafka grid is required to do this. In the new design proposal, it seems that developers will be required to have a Kafka grid (or some equivalent system implementation--hbase, or whatever) to store their configuration. There doesn't seem to be much of a way around this, unless the FileSystemConsumer/FileSystemProducer could be made to work as the backing system for the ConfigStream (which seems possible at first glance).
We could do something as simple as Class.forName(uri.getScheme() + "SystemFactory").newInstance(). This seems a bit hacky and dangerous, but should work, and maintains pluggability.
Rather than this, we could just add an extra switch to the CLI to provide a system factory, but provide built-in defaults for URIs with the file:// and kafka:// schemes. This seems a bit less hacky, and should still work out of the box, for most folks.
In addition, because the control-job.sh script and job cooridnator will want to both read and write to the ConfigStream, we'll have to provide both the broker metadata list, and the ZK path in the URI. This can probably be done with something like:
kafka://<broker-list>:<broker ports>?zk=<zk-list>:<zk-port> kafka://192.168.0.1,192.168.0.2:9192?zk=192.168.0.1,192.168.0.2,192.168.0.3:2181
It's ugly, but it's not our fault. Neither of these systems are represented well in URI schemes.
Eventually, the need for a ZK path should go away, when Kafka finishes moving all of its ZK dependencies behind a broker protocol.
I see the work for this ticket breaking down into three main phases:
- Add ConfigStream to existing JobRunner/AM code, and add HTTP API to AM/SamzaContainer communication.
- Extract job coordinator logic out of the Yarn AM/ThreadJobFactory/ProcessJobFactory, and into a single centralized execution-framework agnostic chunk of code.
- Write the control-job.sh script to manipulate the config stream.
I propose ripping (2) and (3) out into separate tickets, and just focusing on implementing the ConfigStream (1) in this ticket.
support a message payload format that is nested (K: {K:V, ...})
Do you have an estimate of how big these payloads are likely to get? Thinking about jonbringhurst's remark about the Kinesis message size limit.
Apart from that, I agree with your observations about the advantages and disadvantages of each message format. I mildly prefer the K:V format as it's a bit cleaner, and transactionality is much needed anyway. But I'm ok either way.
Adding a getConfig() API seems mildly hacky. The main problem with this approach is how to determine which SystemFactory to use based on the URI.
We could define that the scheme is the name of a system defined in the job configuration. So in order to use a kafka:// URI, you must include a systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory in the job configuration. That configuration parameter would need to be provided as a command-line argument.
Sensible defaults (like defining file:// and kafka:// by default) would then be a nice usability improvement without sacrificing generality.
How will this work in a dev environment?
I think it would be very good if the Kafka runtime dependency is optional, so that Samza remains friendly to jobs which choose to use a different message broker. So I would be keen for Samza to be able to use files for checkpoints and config in dev.
kafka://<broker-list>:<broker ports>?zk=<zk-list>:<zk-port>
Nit: Kafka's configuration repeats the port for each ZK IP address, rather than specifying one port for all IP addresses. Might be better to stick with the same convention.
I propose ripping (2) and (3) out into separate tickets, and just focusing on implementing the ConfigStream (1) in this ticket.
+1
Do you have an estimate of how big these payloads are likely to get?
They will get above 50K for jobs with a lot of input SSPs/task. In general, they shouldn't be that large, though.
I mildly prefer the K:V format as it's a bit cleaner, and transactionality is much needed anyway.
Personally, I agree. I think we should switch to k/v once we have transactionality, but in the meantime, I'm just keeping the proposal inline with what we already do (single message for all offsets for a single task).
We could define that the scheme is the name of a system defined in the job configuration.
This seems to be the cleanest/most general way to do things. It seems not too great from a usability perspective, but if we provide defaults for the file/kafka systems, then it should be OK.
I think it would be very good if the Kafka runtime dependency is optional.
I agree.
So I would be keen for Samza to be able to use files for checkpoints and config in dev.
Sounds reasonable to me.
How does the control-job.sh script use the SystemConsumer/SystemProducer?
A third solution to this problem would be to define the ConfigStream in code. If we move toward doing wiring in code (rather than config) as part of SAMZA-40, we could also define the ConfigStream in code. At first, this might sound rather distasteful, since it will require a full rebuild of code in order to change a ConfigStream, but if you bend your thinking a bit, the ConfigStream is actually closer to wiring than config, so it might be OK to require this.
This would also require the control-job.sh script to download the job that it's controlling, or to have the job's jars on its classpath, so that it would know how to talk to the ConfigStream as well.
I like this approach, except for the complexity it introduces in control-job.sh. Not sure if we can solve this, though. If not, then SystemFactory.getConfig still seems the least distasteful.
A third solution to this problem would be to define the ConfigStream in code.
I can't visualise what this would look like. Could you give an example?
The example would depend on how wiring is handled as part of SAMZA-40. For argument's sake, let's say we had a SamzaJobBuilder class, where you could do something like:
Properties props = new Properties() // Set Kafka consumer/producer properties KafkaConfigStream configStream = new KafkaConfigStream(new KafkaConfig(props)) SamzaJob job = new SamzaJobBuilder() .setConfigStream(configStream) .setFoo(foo) .build
Another variation on this is:
def getJob(config: Config) = { new SamzaJobBuilder() .setConfigStreamFactory(new KafkaConfigStreamFactory) .setFoo(foo) .build }
The main point is just that the config stream would be set in code.
I think it's quite reasonable to expect that if you want to know the 'interpreted' value of a config property (after default is filled in, and perhaps also after it is parsed or otherwise processed) then you need to ask the AM, either via the web interface or via a command-line tool that talks to an API. Then control-job.sh is concerned only with the config that is explicitly declared, and not what is inferred.
1) when you guys talk about "default", does it mean the default configuration provided by the Samza or by the user when he starts the job first time ?
2) there is another use case where the user wants to check the latest configuration used in a finished job and probably reuses the same configuration. Then querying the AM seems not working because the job is already finished. The only way of getting the latest configuration of a finished job is reading the whole ConfigStream. Or we can dump the latest configuration into a file when the job finishes? But the problem is that, at most time, the job is killed rather than finished gracefully by itself.
So in order to use a kafka:// URI, you must include a systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory in the job configuration. That configuration parameter would need to be provided as a command-line argument.
I do not quite get this. Do you mean something like
kafka://<broker-list>:<broker ports>?zk=<zk-list>:<zk-port> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
If yes, why can't this be a default setting when the URI starts with "kafka://"?
This seems to be the cleanest/most general way to do things. It seems not too great from a usability perspective, but if we provide defaults for the file/kafka systems, then it should be OK.
+1. When we have other systems, we could add more. This should not scare people away.
How does the control-job.sh script use the SystemConsumer/SystemProducer?
Maybe I misunderstood something. When we use the control-job.sh to control the job, do we still have the properties file ? If no, is it possible for the control-job.sh to accept a properties file to get the system information? Because the main goal of ConfigStream is to solve the problems such as dynamic changing config, bigger configuration size, not to get rid of the properties file. If yes, then why cannot we put the system information in the properties file?
Also, adding a --file for the control-job.sh maybe helpful, like
control-job.sh --location kafka://localhost:10251 --file /path/to/config/file
because when you start the job at the first time, you may input a lot of configurations. Having a file containing the starting properties is useful.
1) when you guys talk about "default", does it mean the default configuration provided by the Samza or by the user when he starts the job first time ?
We're talking about the "default" value that's used when a developer has not specified a value for a given config. This usually happens in the code when we do a .getOrElse() on a config object.
2) there is another use case where the user wants to check the latest configuration used in a finished job and probably reuses the same configuration. Then querying the AM seems not working because the job is already finished. The only way of getting the latest configuration of a finished job is reading the whole ConfigStream. Or we can dump the latest configuration into a file when the job finishes? But the problem is that, at most time, the job is killed rather than finished gracefully by itself.
Yea, I think you'd have to read the whole config stream.
If yes, why can't this be a default setting when the URI starts with "kafka://"?
I agree, we should just default it to the proper system factory for known system types (kafka, file).
Maybe I misunderstood something. When we use the control-job.sh to control the job, do we still have the properties file ?
No.
If no, is it possible for the control-job.sh to accept a properties file to get the system information?
It is possible, but it seems kind of hacky. Following this train of thought, the control-job.sh could receive a properties file with a system defined for the ConfigStream. The problem is that the coordinator (AM) will also need this file, and it'll be running elsewhere. This leads to the same problem we have today: how do you get the file to the AM? JSON encoded environment variable, HDFS, HTTP, local file system?
I feel like this idea inevitably leads to one of two outcomes:
- A system like we have today, where we pass the static properties file between machines (in our case, via environment variable).
- The properties file is placed on every host, and an environment variable is defined so that control-job.sh and the job coordinator (AM) can both find it.
Because the main goal of ConfigStream is to solve the problems such as dynamic changing config, bigger configuration size, not to get rid of the properties file.
True. I just want to make this as easy to use as possible. Having the kafka:// URI, with default system factories seems more usable than another properties file.
Also, adding a --file for the control-job.sh maybe helpful
We will certainly need a way to pipe config files into the ConfigStream. Something like {--file} makes a lot of sense.
True. I just want to make this as easy to use as possible. Having the kafka:// URI, with default system factories seems more usable than another properties file.
Totally agree. When users define a new system, they may need to pass a lot of initial configs into the control-job.sh. But I guess, when we can pipe config files, it may not be a big deal because they can put the new system's configuration in a file.
We will certainly need a way to pipe config files into the ConfigStream.
Another reason it is useful is that, if we allows the control.sh to pipe config files, we will have an easier life in debugging a Samza job with the same configuration:
1. start a samza job with initial configurations.
2. change configurations.
3. kill the samza job.
4. dump the ConfigStream into a config file.
5. start a new samza job with the same configuration.
Step 4 gives three advantages: 1) give an intuitive view of what is used in the latest samza job 2) speed up the starting process because the AM does need to read the whole ConfigStream again in step 5. 3) easily make changes in the config file.
I am taking this up. Focus will be on adding an HTTP JSON server to the AM/local job factories, and writing the config to the stream.
I am taking a very incremental approach to this ticket. I've broken the work down into:
- Convert the AM:SamzaContainer communication to be HTTP/JSON.
- Convert the job:coordinator (AM, process job, thread job) communication to be ConfigStream.
- Extract shared logic between AM, process job, and thread job into single job coordinator.
- Write a control-job.sh script, and remove run-job.sh.
I've opened up SAMZA-438 for (1), and posted a draft patch to it. I'm looking for feedback.
Hi, referring to Chris's comment before: https://issues.apache.org/jira/browse/SAMZA-348?focusedCommentId=14134172&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14134172, we had a discussion on the potential race condition problem w/ the checkpoint messages from container. The specific issue in the discussion: when a new container was restarted, how does Samza AM knows that it have consumed all checkpoint messages from the container's last run? I think that the following solution would be a cheap and attractive one:
1. Samza AM always assign a monotonically increasing generation # to each invocation of a specific container
2. Each container will now associate its current generation # w/ the checkpoint messages published to the ConfigStream
3. When a container failure is detected, Samza AM increment the generation # and bounce the container
4. On restart, the container will first publish a start token w/ its current generation # to the ConfigStream
5. Now, the Samza AM receiving messages from the ConfigStream can perform the following decisions:
a. on reception of the new generation # of the container, Smaza AM knows that it has consumed all previous checkpoints and recovered the state. Hence, it can start serving the config to that new generation of container via HTTP API
b. If there is any issue (e.g. network partition between the SamzaAM and the container) that makes the SamzaAM "thinks" the container has failed and restarted a new one, the checkpoint messages from the still running old generation of container can now be safely discarded after receiving the start token from the new generation of the same container.
Please review and comment to see whether there are other issues I may miss.
Thanks!
Hi criccomini, I am interested in using explicit restarts as described in the design document.
I have had a go at implementing this locally by updating the ConfigManager to listen for restart messages and I would love to contribute. If this is ok, please would you create a subtask for me to do this?
I have also found a bug with the json deserialisation in the YarnUtil class that is used by ConfigManager to query the yarn webapp api for all the running applications so I would like to contribute a fix for that as well please. Should I email the dev mailing list about this or could you create a jira for that too?
Thank you.
Hi, alex.buck10, I have created the sub-task SAMZA-921 and assigned to you. There have been many updates regarding to the CoordinatorStream and the JobCoordinator recently. Please check SAMZA-448 for the implementation of CoordinatorStream. This might also be related to the refactoring of JobCoordinator that we are actively working on: SAMZA-881. So, it would be good if the design/implementation of the dynamic re-config via restart can be compatible w/ the refactored JobCoordinator as well.
As for the bugs you found in JSON deserialization, feel free to open a JIRA. Everyone should have the power to open JIRA in Samza.
Thanks a lot!
We are discussing storing checkpoints in the ConfigLog. The way we handle checkpoints might change when we want to leverage Kafka's proposed transactionality feature:
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
I believe that this feature requires us to store checkpoints in Kafka, since the offset commit and transaction commit must happen atomically. I need to re-read the design doc to refresh my memory, but if this holds true, then the ConfigLog wouldn't be useful for storing Kafka offsets. It might still be useful for other systems (e.g. file system), though.