Hadoop YARN
  1. Hadoop YARN
  2. YARN-1530

[Umbrella] Store, manage and serve per-framework application-timeline data

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      This is a sibling JIRA for YARN-321.

      Today, each application/framework has to do store, and serve per-framework data all by itself as YARN doesn't have a common solution. This JIRA attempts to solve the storage, management and serving of per-framework data from various applications, both running and finished. The aim is to change YARN to collect and store data in a generic manner with plugin points for frameworks to do their own thing w.r.t interpretation and serving.

      1. application timeline design-20140108.pdf
        82 kB
        Vinod Kumar Vavilapalli
      2. application timeline design-20140116.pdf
        86 kB
        Billie Rinaldi
      3. application timeline design-20140130.pdf
        91 kB
        Billie Rinaldi
      4. application timeline design-20140210.pdf
        93 kB
        Billie Rinaldi
      5. ATS-meet-up-8-28-2014-notes.pdf
        80 kB
        Sangjin Lee
      6. ATS-Write-Pipeline-Design-Proposal.pdf
        71 kB
        Robert Kanter

        Issue Links

          Activity

          Vinod Kumar Vavilapalli created issue -
          Vinod Kumar Vavilapalli made changes -
          Field Original Value New Value
          Link This issue is related to YARN-321 [ YARN-321 ]
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Working on a design doc that explains requirements and the solution space. I hope to push it out soon..

          Show
          Vinod Kumar Vavilapalli added a comment - Working on a design doc that explains requirements and the solution space. I hope to push it out soon..
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attached is the designed doc.

          Wrote this after co-design & discussions with Arun C Murthy, Billie Rinaldi, Gopal Vijayaraghavan, Hitesh Shah, Mayank Bansal and Zhijie Shen.

          This is very much an evolving document.

          Feedback welcome!

          Show
          Vinod Kumar Vavilapalli added a comment - Attached is the designed doc. Wrote this after co-design & discussions with Arun C Murthy, Billie Rinaldi, Gopal Vijayaraghavan, Hitesh Shah, Mayank Bansal and Zhijie Shen. This is very much an evolving document. Feedback welcome!
          Vinod Kumar Vavilapalli made changes -
          Hide
          Lohit Vijayarenu added a comment -

          We might have also think about data transfer rate for REST endpoint from all AMs/Containers if this is hosted by ResourceManager. One idea could be to have REST endpoint be library which any AM can inherit. When AM initializes this library can init REST endpoint which then can push events to pluggable storage (HDFS/Kafka ...). This might be similar to how AM writes history events to HDFS today. This should give good scalability without changing much from API perspective.

          Show
          Lohit Vijayarenu added a comment - We might have also think about data transfer rate for REST endpoint from all AMs/Containers if this is hosted by ResourceManager. One idea could be to have REST endpoint be library which any AM can inherit. When AM initializes this library can init REST endpoint which then can push events to pluggable storage (HDFS/Kafka ...). This might be similar to how AM writes history events to HDFS today. This should give good scalability without changing much from API perspective.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          So, the local writer API for AMs/containers is always REST to a local library? That'd mean we have to start one more (private) proxy server inside the library inherited by the AM/container?

          Show
          Vinod Kumar Vavilapalli added a comment - So, the local writer API for AMs/containers is always REST to a local library? That'd mean we have to start one more (private) proxy server inside the library inherited by the AM/container?
          Hide
          Lohit Vijayarenu added a comment -

          Yes, proxy server inside library, but only in AM not containers. Containers could make rest calls to AM. Main advantage is that we would not send timeline data to one single server. For example we have seen cases where our history files could grow upto 700MB for large jobs. In that case having hundreds of would would easily become bottleneck for single REST point, distributing it to its own AM would help.

          Show
          Lohit Vijayarenu added a comment - Yes, proxy server inside library, but only in AM not containers. Containers could make rest calls to AM. Main advantage is that we would not send timeline data to one single server. For example we have seen cases where our history files could grow upto 700MB for large jobs. In that case having hundreds of would would easily become bottleneck for single REST point, distributing it to its own AM would help.
          Hide
          Robert Joseph Evans added a comment -

          I agree that we need to think about load and plan for something that can handle at least 20x the current load but preferably 100x. However, I am not that sure that the load will be a huge problem at least for current MR clusters. We have seen very large jobs as well, but 700 MB history file job does not finish instantly. I took a look at a 3500 node cluster we have that is under fairly heavy load, and looking at the done directory for yesterday, I saw what amounted to about 1.7MB/sec of data on average. Gigabit Ethernet should be able to handle 15 to 20 times this (assuming that we read as much data as we write, and that the storage may require some replication).

          I am fine with the proposed solution by Lohit Vijayarenu so long as the history service always provides a restful interface and the AM can decide if it wants to use it, or go through a different higher load channel. Otherwise non-java based AMs would not necessarily be able to write to the history service.

          I am also a bit nervous about using the history service for recovery or as a backend for the current MR APIs if we have a pub/sub system as a link between the applications and the history service. I don't think it is a show stopper, it just opens the door for a number of corner cases that will have to be dealt with, like an MR AM crashes badly and the client goes to the history service to get the counters/etc, when does the history service know that all of the events for the MR AM have been processed so it can return those counters, or perhaps other data? I am not totally sure what data may be a show stopper for this, but the lag means all applications have to be sure that they don't use the history service for split brain problems or things like that.

          Show
          Robert Joseph Evans added a comment - I agree that we need to think about load and plan for something that can handle at least 20x the current load but preferably 100x. However, I am not that sure that the load will be a huge problem at least for current MR clusters. We have seen very large jobs as well, but 700 MB history file job does not finish instantly. I took a look at a 3500 node cluster we have that is under fairly heavy load, and looking at the done directory for yesterday, I saw what amounted to about 1.7MB/sec of data on average. Gigabit Ethernet should be able to handle 15 to 20 times this (assuming that we read as much data as we write, and that the storage may require some replication). I am fine with the proposed solution by Lohit Vijayarenu so long as the history service always provides a restful interface and the AM can decide if it wants to use it, or go through a different higher load channel. Otherwise non-java based AMs would not necessarily be able to write to the history service. I am also a bit nervous about using the history service for recovery or as a backend for the current MR APIs if we have a pub/sub system as a link between the applications and the history service. I don't think it is a show stopper, it just opens the door for a number of corner cases that will have to be dealt with, like an MR AM crashes badly and the client goes to the history service to get the counters/etc, when does the history service know that all of the events for the MR AM have been processed so it can return those counters, or perhaps other data? I am not totally sure what data may be a show stopper for this, but the lag means all applications have to be sure that they don't use the history service for split brain problems or things like that.
          Hide
          Billie Rinaldi added a comment -

          Here is an updated design doc with some modifications to the GET and PUT requests. This attempts to modularize the capabilities of the API and make it easier to work with.

          Show
          Billie Rinaldi added a comment - Here is an updated design doc with some modifications to the GET and PUT requests. This attempts to modularize the capabilities of the API and make it easier to work with.
          Billie Rinaldi made changes -
          Billie Rinaldi made changes -
          Hide
          Chris Riccomini added a comment -

          Had a look at the design doc. I think I'm starting to get it, but I have a couple of questions:

          1. Is the expectation that people will be able to use this for INFO, WARN, ERROR type application logging?
          2. Regarding app-specific UIs, is it going to be possible to embed app-specific UIs with YARN's UI, instead of having to run an app-specific web-ui? There is some mention of JS UIs, but it's a little unclear whether this would be embedded in YARN, or served from "somewhere" (to quote the docs. If it's served from the RM (or some other web-ui in YARN), will it be up to ops to decide which libraries are embedded, or up to
          3. What are the planned "out of the box implementations" for both the storage and transport layers? REST+LevelDB+HBase? Are Flume and Kafka implementations expected to happen outside of the YARN project?

          Show
          Chris Riccomini added a comment - Had a look at the design doc. I think I'm starting to get it, but I have a couple of questions: 1. Is the expectation that people will be able to use this for INFO, WARN, ERROR type application logging? 2. Regarding app-specific UIs, is it going to be possible to embed app-specific UIs with YARN's UI, instead of having to run an app-specific web-ui? There is some mention of JS UIs, but it's a little unclear whether this would be embedded in YARN, or served from "somewhere" (to quote the docs. If it's served from the RM (or some other web-ui in YARN), will it be up to ops to decide which libraries are embedded, or up to 3. What are the planned "out of the box implementations" for both the storage and transport layers? REST+LevelDB+HBase? Are Flume and Kafka implementations expected to happen outside of the YARN project?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Thanks for the comments everyone. Some responses follow:

          Lohit

          Yes, proxy server inside library, but only in AM not containers.

          Yes, that's a good idea and inline with what I might have hinted in the doc. When containers also start writing out events, we need to have these intermediate aggregators for scalability. I was thinking NodeManagers but your idea of AMs is better. If we make everyone only depend on a library, the underlying protocol can be either RPC or REST together with a RPC server or a proxy like you mentioned. If REST is itself the first-class API, yes, proxy-server is the way.

          Bobby,

          I am also a bit nervous about using the history service for recovery or as a backend for the current MR APIs if we have a pub/sub system as a link between the applications and the history service.

          Agreed. This is a problem that exists even today with MR apps in a way with HDFS acting as a pubsub system. We've seen the corner cases you are hinting at even with the HDFS based history files. I think we now understand enough about these edge cases. I haven't yet made the jump to making these events be the base for recovery, but points noted for when we wish to.

          Chris,

          1. Is the expectation that people will be able to use this for INFO, WARN, ERROR type application logging?

          Application logging is not supposed to be through this channel. This feature is fundamentally for meta-event information - frankly, the kind we log in MR JobHistory files. We already have the ability for containers to write logs to a local log-directory and the corresponding features for aggregation, rendering. This is a high throughput path that IMO cannot be sustained by the solution of this JIRA. I'll make it clear in the doc.

          2. Regarding app-specific UIs, is it going to be possible to embed app-specific UIs with YARN's UI, instead of having to run an app-specific web-ui? There is some mention of JS UIs, but it's a little unclear whether this would be embedded in YARN, or served from "somewhere" (to quote the docs. If it's served from the RM (or some other web-ui in YARN), will it be up to ops to decide which libraries are embedded, or up to

          I thought I wrote it clearly, will have to reread and edit if necessary. The idea is to either let users host their UI's elsewhere or give an ability for admins to install UIs (that they have vetted for stability/security etc) in the HIstoryServer itself.

          3. What are the planned "out of the box implementations" for both the storage and transport layers? REST+LevelDB+HBase? Are Flume and Kafka implementations expected to happen outside of the YARN project?

          We are planning Library(wrapping REST)+LevelDB+LocalFS for smaller deployments and out of the box. We will then move to working on Library(wrapping REST)+LevelDB+HBase for larger deployments.

          The discussion of Flume/Kafka is related to the event-aggregation which we currently are doing via a simple REST put to a web-service. Whether/when we move to those implementations will depend on the urgency with which we run into issues mentioned in the doc with REST APIs. But it's open at this point of time.

          Show
          Vinod Kumar Vavilapalli added a comment - Thanks for the comments everyone. Some responses follow: Lohit Yes, proxy server inside library, but only in AM not containers. Yes, that's a good idea and inline with what I might have hinted in the doc. When containers also start writing out events, we need to have these intermediate aggregators for scalability. I was thinking NodeManagers but your idea of AMs is better. If we make everyone only depend on a library, the underlying protocol can be either RPC or REST together with a RPC server or a proxy like you mentioned. If REST is itself the first-class API, yes, proxy-server is the way. Bobby, I am also a bit nervous about using the history service for recovery or as a backend for the current MR APIs if we have a pub/sub system as a link between the applications and the history service. Agreed. This is a problem that exists even today with MR apps in a way with HDFS acting as a pubsub system. We've seen the corner cases you are hinting at even with the HDFS based history files. I think we now understand enough about these edge cases. I haven't yet made the jump to making these events be the base for recovery, but points noted for when we wish to. Chris, 1. Is the expectation that people will be able to use this for INFO, WARN, ERROR type application logging? Application logging is not supposed to be through this channel. This feature is fundamentally for meta-event information - frankly, the kind we log in MR JobHistory files. We already have the ability for containers to write logs to a local log-directory and the corresponding features for aggregation, rendering. This is a high throughput path that IMO cannot be sustained by the solution of this JIRA. I'll make it clear in the doc. 2. Regarding app-specific UIs, is it going to be possible to embed app-specific UIs with YARN's UI, instead of having to run an app-specific web-ui? There is some mention of JS UIs, but it's a little unclear whether this would be embedded in YARN, or served from "somewhere" (to quote the docs. If it's served from the RM (or some other web-ui in YARN), will it be up to ops to decide which libraries are embedded, or up to I thought I wrote it clearly, will have to reread and edit if necessary. The idea is to either let users host their UI's elsewhere or give an ability for admins to install UIs (that they have vetted for stability/security etc) in the HIstoryServer itself. 3. What are the planned "out of the box implementations" for both the storage and transport layers? REST+LevelDB+HBase? Are Flume and Kafka implementations expected to happen outside of the YARN project? We are planning Library(wrapping REST)+LevelDB+LocalFS for smaller deployments and out of the box. We will then move to working on Library(wrapping REST)+LevelDB+HBase for larger deployments. The discussion of Flume/Kafka is related to the event-aggregation which we currently are doing via a simple REST put to a web-service. Whether/when we move to those implementations will depend on the urgency with which we run into issues mentioned in the doc with REST APIs. But it's open at this point of time.
          Hide
          Patrick Wendell added a comment -

          Just gave this a read. Based on my understanding, the design here is basically an indexing service for timeseries meta data from applications. The major design decisions are that the API is REST for both inserting and removing data and that the data format will be fairly structured, include a first class notion of time, and support filtering based on some dimensional information. Other questions like “how is the data persisted” and “what type of intermediate aggregation do we support” seem to be undecided at this point or will be pluggable.

          I can give feedback from the perspective of Spark, which is an application that runs on YARN but is not MapReduce. In Spark’s case, while we enthusiastically support YARN, we also support other resource managers. So it’s unlikely we’d ever add this indexing service as a dependency in the way we architect our UI persistence. However, we are in the process of thinking about building a history server component right now, so it would be nice to structure things in a way where this can be leveraged in YARN environments. The fact that the API is simple (REST) is a big +1 in that regard.

          My biggest concern with this design is the notion of sending live data to a single node rather than writing through HDFS. In Spark, tasks can easily be 100 milliseconds or less. This means that even a short Spark job can execute tens of thousands of tasks and large spark job can execute hundreds of thousands of tasks or more. It’s easily an order of magnitude more tasks per unit time than MR and we also track a large amount of instrumentation per task since users tend to be very performance conscious. So I might worry about the rate at which events can be reported over REST vs over a bulk transfer through compressed HDFS files.

          Another question - if we wanted to write an “approved” UI that would be served from within the same JVM, what would be the interface between that UI and the indexing service? Would it also speak REST just within a single process, or is it some other interface?

          A final question - what is the security reason why YARN can't link to a framework-specific UI? It seems like whether the user has a link to the URL and whether it's secure are unrelated. I’m not super familiar with the security model around web UI’s in YARN though...

          Show
          Patrick Wendell added a comment - Just gave this a read. Based on my understanding, the design here is basically an indexing service for timeseries meta data from applications. The major design decisions are that the API is REST for both inserting and removing data and that the data format will be fairly structured, include a first class notion of time, and support filtering based on some dimensional information. Other questions like “how is the data persisted” and “what type of intermediate aggregation do we support” seem to be undecided at this point or will be pluggable. I can give feedback from the perspective of Spark, which is an application that runs on YARN but is not MapReduce. In Spark’s case, while we enthusiastically support YARN, we also support other resource managers. So it’s unlikely we’d ever add this indexing service as a dependency in the way we architect our UI persistence. However, we are in the process of thinking about building a history server component right now, so it would be nice to structure things in a way where this can be leveraged in YARN environments. The fact that the API is simple (REST) is a big +1 in that regard. My biggest concern with this design is the notion of sending live data to a single node rather than writing through HDFS. In Spark, tasks can easily be 100 milliseconds or less. This means that even a short Spark job can execute tens of thousands of tasks and large spark job can execute hundreds of thousands of tasks or more. It’s easily an order of magnitude more tasks per unit time than MR and we also track a large amount of instrumentation per task since users tend to be very performance conscious. So I might worry about the rate at which events can be reported over REST vs over a bulk transfer through compressed HDFS files. Another question - if we wanted to write an “approved” UI that would be served from within the same JVM, what would be the interface between that UI and the indexing service? Would it also speak REST just within a single process, or is it some other interface? A final question - what is the security reason why YARN can't link to a framework-specific UI? It seems like whether the user has a link to the URL and whether it's secure are unrelated. I’m not super familiar with the security model around web UI’s in YARN though...
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Thanks for your thoughts, Patrick!

          My biggest concern with this design is the notion of sending live data to a single node rather than writing through HDFS.

          From the client point of view (AMs and containers), this is really an implementation detail and is part of the event-aggregation system that I referred to in the document. I've seen implementations of at least a couple of these aggregation systems and after getting enough site-specific requests to be able to use Flume/Kafka/simple web-service/HDFS/HBase, I decided to bake in some sort of pluggability here. It is entirely conceivable to do what you are mentioning.

          I thought I mentioned about throughput of events. We do care about it for the sake of applications like Storm, TEZ, (and now Spark) that push out information an order of magnitude more than today's MR. We are pursuing different implementations, the first of which is most likely going to be HBase. We can optionally do a HBase based implementation without a lot of effort. In fact that is exactly what the generic history service (YARN-321) does and we are thinking of retrofitting that into this abstraction.

          In sum, REST is the user API and there is a different abstraction for event-aggregation. With this, I can see a HDFS-bus implementation that does what you want.

          if we wanted to write an “approved” UI that would be served from within the same JVM, what would be the interface between that UI and the indexing service?

          Same JVM == AM? IAC, the service is agnostic of where you run the UI code.

          what is the security reason why YARN can't link to a framework-specific UI?

          I should add more clarity there, perhaps. The fundamental problem is that any user can write a YARN app and host his/her own UI. References to these UIs eventually land on the YARN consoles (RM/AHS) etc and can be used by malicious users to steal others' credentials by XSS and or by simple, unnoticeable redirection. That is why today we proxy all application UIs through a central proxy-server and ask users to not click on any link that isn't through this proxy. Framework specific UIs for serving history also fit in the same pattern.

          Let me know if the above make sense.

          That said, I'd like to see what can be done here so as to bring Spark on board with benefits for both projects.

          So it’s unlikely we’d ever add this indexing service as a dependency in the way we architect our UI persistence.

          If your UI can be written so the presentation layer is separated from the information provider services (which you may want to do anyways) and the interaction is through REST, I can totally imagine being able to reuse your UI code with and without using this YARN specific service. I can even think of putting this out of YARN - it doesn't necessarily belong to YARN core - so that you can use it in isolation.

          The overarching theme is to do what ever it takes to not duplicate this same effort (the collection of all main problems-to-solve in the document) in each of the individual projects like Spark, Storm, TEZ etc.

          Show
          Vinod Kumar Vavilapalli added a comment - Thanks for your thoughts, Patrick! My biggest concern with this design is the notion of sending live data to a single node rather than writing through HDFS. From the client point of view (AMs and containers), this is really an implementation detail and is part of the event-aggregation system that I referred to in the document. I've seen implementations of at least a couple of these aggregation systems and after getting enough site-specific requests to be able to use Flume/Kafka/simple web-service/HDFS/HBase, I decided to bake in some sort of pluggability here. It is entirely conceivable to do what you are mentioning. I thought I mentioned about throughput of events. We do care about it for the sake of applications like Storm, TEZ, (and now Spark) that push out information an order of magnitude more than today's MR. We are pursuing different implementations, the first of which is most likely going to be HBase. We can optionally do a HBase based implementation without a lot of effort. In fact that is exactly what the generic history service ( YARN-321 ) does and we are thinking of retrofitting that into this abstraction. In sum, REST is the user API and there is a different abstraction for event-aggregation. With this, I can see a HDFS-bus implementation that does what you want. if we wanted to write an “approved” UI that would be served from within the same JVM, what would be the interface between that UI and the indexing service? Same JVM == AM? IAC, the service is agnostic of where you run the UI code. what is the security reason why YARN can't link to a framework-specific UI? I should add more clarity there, perhaps. The fundamental problem is that any user can write a YARN app and host his/her own UI. References to these UIs eventually land on the YARN consoles (RM/AHS) etc and can be used by malicious users to steal others' credentials by XSS and or by simple, unnoticeable redirection. That is why today we proxy all application UIs through a central proxy-server and ask users to not click on any link that isn't through this proxy. Framework specific UIs for serving history also fit in the same pattern. Let me know if the above make sense. That said, I'd like to see what can be done here so as to bring Spark on board with benefits for both projects. So it’s unlikely we’d ever add this indexing service as a dependency in the way we architect our UI persistence. If your UI can be written so the presentation layer is separated from the information provider services (which you may want to do anyways) and the interaction is through REST, I can totally imagine being able to reuse your UI code with and without using this YARN specific service. I can even think of putting this out of YARN - it doesn't necessarily belong to YARN core - so that you can use it in isolation. The overarching theme is to do what ever it takes to not duplicate this same effort (the collection of all main problems-to-solve in the document) in each of the individual projects like Spark, Storm, TEZ etc.
          Billie Rinaldi made changes -
          Hide
          Patrick Wendell added a comment -

          Hey,

          Thanks for the explanation! To make sure I understand how this would all work by walking through an example.

          For the Spark UI we are currently implementing the ability to serialize and write events to HDFS, then load them later from a history server that can render the UI for jobs that are finished. AFAIK this is basically how MapReduce works as well

          If users have set-up a YARN cluster and they set up event ingestion to this shared store. Then Spark would need two things to integrate with it:

          1. Be able to represent our events in JSON and hook into whatever source the user has set up for ingestion (flume, HDFS, etc).
          2. Be able to render our history timeline UI by reading event data from this store.

          Correct?

          The benefit would be that if users set something fancy like flume, they could leverage the same infrastructure for Spark as for other applications since there is a shared event model. Also, they would benefit from faster indexed serving offered by this application when rendering the "history" UI...

          Is that the main idea? I'm just trying to figure out what redundant work is saved by having a generic framework. Since each application writes their own UI and has their own event model. From what I can tell the benefit is that a shared ingestion and serving infrastructure can be used.

          Show
          Patrick Wendell added a comment - Hey, Thanks for the explanation! To make sure I understand how this would all work by walking through an example. For the Spark UI we are currently implementing the ability to serialize and write events to HDFS, then load them later from a history server that can render the UI for jobs that are finished. AFAIK this is basically how MapReduce works as well If users have set-up a YARN cluster and they set up event ingestion to this shared store. Then Spark would need two things to integrate with it: 1. Be able to represent our events in JSON and hook into whatever source the user has set up for ingestion (flume, HDFS, etc). 2. Be able to render our history timeline UI by reading event data from this store. Correct? The benefit would be that if users set something fancy like flume, they could leverage the same infrastructure for Spark as for other applications since there is a shared event model. Also, they would benefit from faster indexed serving offered by this application when rendering the "history" UI... Is that the main idea? I'm just trying to figure out what redundant work is saved by having a generic framework. Since each application writes their own UI and has their own event model. From what I can tell the benefit is that a shared ingestion and serving infrastructure can be used.
          Hide
          Zhijie Shen added a comment -

          1. Be able to represent our events in JSON and hook into whatever source the user has set up for ingestion (flume, HDFS, etc).

          Currently, you can either compose your JSON content and send HTTP request yourself, or make use of ATSClient and ATSXXXX POJO classes (the names may be refactored). The latter way should be comparably easier. If the communication media is changed in the future, we're going to provide relevant user lib to publish data easily.

          Is that the main idea?

          Yes, I think so. We want to relieve developers from building a history server from nothing. Let YARN to provide the infrastructure for generic applications, and each application focuses on the its individual logic, such as the data model, and how to render the data.

          Show
          Zhijie Shen added a comment - 1. Be able to represent our events in JSON and hook into whatever source the user has set up for ingestion (flume, HDFS, etc). Currently, you can either compose your JSON content and send HTTP request yourself, or make use of ATSClient and ATSXXXX POJO classes (the names may be refactored). The latter way should be comparably easier. If the communication media is changed in the future, we're going to provide relevant user lib to publish data easily. Is that the main idea? Yes, I think so. We want to relieve developers from building a history server from nothing. Let YARN to provide the infrastructure for generic applications, and each application focuses on the its individual logic, such as the data model, and how to render the data.
          Zhijie Shen made changes -
          Link This issue is related to YARN-1744 [ YARN-1744 ]
          Hide
          Zhijie Shen added a comment -

          I've done some stress test about the timeline service. I've taken two steps:

          1. I setup a yarn cluster of 5 nodes (1 master and 4 slaves). I ran 7200 mapreduce example jobs with tez framework (which will post tez entities to the timeline service) in 10 hours. The max concurrent job was 7, because it was not a big cluster with 32G memory only, but the real concurrency should be higher because of multiple mappers/reducers. The workload was kept almost full in 10 hours. All the job were succeeded, and the tez entities were stored in the timeline store without exceptions. The leveldb based timeline store has grown to about 220MB (not very big because of small example jobs).

          2. Afterwards, I tested the the concurrent reads/writes together. On the write part, I did the same thing as step 1. On the read part, I set up 4 timeline query clients, one on each slave node. Each client starts 10 parallel threads to send requests to the timeline service for 10 hours as well. Each client sent more than 6 million queries during the 10 hours with the combination of three RESTful APIs (24+ million total for 4 clients). In general, the timeline service was still working well. I just saw one query was responded with not found exception, and some other JVM warnings. The query of get entities takes 0.X on average while the query of get entity/events take 0.0x.

          Therefore, the timeline service with leveldb store works so far so good. I'll do more stress testing with big entity, and update to you once I've some metrics.

          Show
          Zhijie Shen added a comment - I've done some stress test about the timeline service. I've taken two steps: 1. I setup a yarn cluster of 5 nodes (1 master and 4 slaves). I ran 7200 mapreduce example jobs with tez framework (which will post tez entities to the timeline service) in 10 hours. The max concurrent job was 7, because it was not a big cluster with 32G memory only, but the real concurrency should be higher because of multiple mappers/reducers. The workload was kept almost full in 10 hours. All the job were succeeded, and the tez entities were stored in the timeline store without exceptions. The leveldb based timeline store has grown to about 220MB (not very big because of small example jobs). 2. Afterwards, I tested the the concurrent reads/writes together. On the write part, I did the same thing as step 1. On the read part, I set up 4 timeline query clients, one on each slave node. Each client starts 10 parallel threads to send requests to the timeline service for 10 hours as well. Each client sent more than 6 million queries during the 10 hours with the combination of three RESTful APIs (24+ million total for 4 clients). In general, the timeline service was still working well. I just saw one query was responded with not found exception, and some other JVM warnings. The query of get entities takes 0.X on average while the query of get entity/events take 0.0x. Therefore, the timeline service with leveldb store works so far so good. I'll do more stress testing with big entity, and update to you once I've some metrics.
          Zhijie Shen made changes -
          Link This issue is related to YARN-1452 [ YARN-1452 ]
          Zhijie Shen made changes -
          Link This issue is related to YARN-1701 [ YARN-1701 ]
          Zhijie Shen made changes -
          Link This issue is depended upon by MAPREDUCE-5858 [ MAPREDUCE-5858 ]
          Zhijie Shen made changes -
          Link This issue is related to YARN-1982 [ YARN-1982 ]
          Vinod Kumar Vavilapalli made changes -
          Link This issue is related to YARN-1982 [ YARN-1982 ]
          Zhijie Shen made changes -
          Link This issue is related to YARN-1935 [ YARN-1935 ]
          Gunther Hagleitner made changes -
          Link This issue relates to HIVE-7076 [ HIVE-7076 ]
          Hide
          Sangjin Lee added a comment -

          Some members of the Hadoop open source community that are involved in the ATS development and others had a community meet-up hosted by Twitter. We had a wide-ranging and free discussions on critical use cases and needs, the current state of ATS and where it is going, and some concerns and feedback on the current design.

          We hope that some of those items will be incorporated into this work, and will serve the community better.

          I am posting the meeting notes from the meet-up so everyone can view what was discussed. Feedback and comments are welcome. Thanks!

          Show
          Sangjin Lee added a comment - Some members of the Hadoop open source community that are involved in the ATS development and others had a community meet-up hosted by Twitter. We had a wide-ranging and free discussions on critical use cases and needs, the current state of ATS and where it is going, and some concerns and feedback on the current design. We hope that some of those items will be incorporated into this work, and will serve the community better. I am posting the meeting notes from the meet-up so everyone can view what was discussed. Feedback and comments are welcome. Thanks!
          Sangjin Lee made changes -
          Attachment ATS-meet-up-8-28-2014-notes.pdf [ 12665966 ]
          Hide
          Robert Kanter added a comment -

          Thanks Sangjin for posting those notes.


          I've written up a rough proposal based on some of those discussions on how we can improve the scalability and reliability of the ATS write path and attached it to this JIRA: "ATS-Write-Pipeline-Design-Proposal.pdf". I'd like to use this to drive further discussions and hopefully let us all work towards this design (or a design that comes out of these discussions).

          Show
          Robert Kanter added a comment - Thanks Sangjin for posting those notes. I've written up a rough proposal based on some of those discussions on how we can improve the scalability and reliability of the ATS write path and attached it to this JIRA: "ATS-Write-Pipeline-Design-Proposal.pdf". I'd like to use this to drive further discussions and hopefully let us all work towards this design (or a design that comes out of these discussions).
          Robert Kanter made changes -
          Attachment ATS-Write-Pipeline-Design-Proposal.pdf [ 12666932 ]
          Hide
          Zhijie Shen added a comment -

          Robert Kanter, thanks for proposing the idea of pipeline writing, which sounds interesting. W.R.T the two proposed solutions, I have some general comments.

          1. HDFS-based implementation:

          It sounds an interesting idea of using HDFS to persist unpublished timeline entities, but I’m not sure it’s going to solve the scalability problem. Though each application can write the timeline entities into HDFS in a distributed manner, there’s still a single timeline server that fetches the files of the timeline entities written by ALL applications. The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability. And given the same timeline sever, I’m afraid that HTTP REST interface is very likely to be be more efficient:

          a) Less I/O against the secondary storage;
          b) Bult-in multithreading mechanism from the web server;
          c) No delay of waiting until next round of file fetching.

          The current writing channel allows the data to be available on the timeline server immediately, such that we can support the realtime or near realtime query about the application metrics. Because of c), I’m not sure this is still feasible with HDFS writing channel (too frequent fetching will have performance problem?).

          IMHO, the ESSENTIAL problem is that a single instance of the timeline server is not going to to intake a huge number of concurrent requests (no matter they’re from HTTP REST, RPC or HDFS). Let’s assume we’re going to have the HBase timeline store (YARN-2032), which is scalable and reliable. Thanks to the stateless nature, the proper way scale up the timeline service to start a number of federal timeline sever and connect them to the same HBase timeline store. This design will solve the high availability problem as well. Moreover, it’s not just scale the writing channel, but also the reading one. I’ll file a separate ticket about the scalable and highly available timeline server.

          2. Direct-writing implementation:

          The biggest concern here is that the solution is cracking the current timeline server architecture:
          a) Accepting requests for users ->
          b) Pre-prosessing the timeline entities and verifying users’ access ->
          c) Transmitting the timeline entities into key/value pairs

          If you want to open the option for the client to write into the data store directly, all this logic has to be moved to the client. It’s a complex stack than some simple put calls to the data store. It’s not just increasing the more dependency for the timeline client, but changing it from a thin client to a fat one. This makes it more difficult to distribute the timeline client and upgrade it in the future, and makes the user heavily loaded. Importantly, I’m not sure we will be able to verify user's access at the client side, or HBase access control is enough for our specific timeline ACLs.

          Another concern is that the client is strongly coupled with a particular type of data store, such as HBase. If we choose to use Leveldb (or Rocksdb), the current client for HBase used at the application side is going to be broken. In other word, you have to change the client and the data store simultaneously.

          Misc. SLA of TimelineClient

          The proposal recalls me another interesting question: SLA of the timeline server and the client. The timeline server should be reliable: when an timeline entity is accepted at the timeline server, it should not be lost, which is ensured by reliable data store (e.g. HBase): YARN-2032. It is questionable whether we also want the timeline client to reliable: when an timeline entity is passed to the timeline client, it should not be lost before being accepted by the timeline server, which may be in the outage for a while. Hence, local cache in HDFS may be a good idea. Or we can use an even lightweight solution: Leveldb. I’ll file a ticket about it as well.

          Show
          Zhijie Shen added a comment - Robert Kanter , thanks for proposing the idea of pipeline writing, which sounds interesting. W.R.T the two proposed solutions, I have some general comments. 1. HDFS-based implementation: It sounds an interesting idea of using HDFS to persist unpublished timeline entities, but I’m not sure it’s going to solve the scalability problem. Though each application can write the timeline entities into HDFS in a distributed manner, there’s still a single timeline server that fetches the files of the timeline entities written by ALL applications. The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability. And given the same timeline sever, I’m afraid that HTTP REST interface is very likely to be be more efficient: a) Less I/O against the secondary storage; b) Bult-in multithreading mechanism from the web server; c) No delay of waiting until next round of file fetching. The current writing channel allows the data to be available on the timeline server immediately, such that we can support the realtime or near realtime query about the application metrics. Because of c), I’m not sure this is still feasible with HDFS writing channel (too frequent fetching will have performance problem?). IMHO, the ESSENTIAL problem is that a single instance of the timeline server is not going to to intake a huge number of concurrent requests (no matter they’re from HTTP REST, RPC or HDFS). Let’s assume we’re going to have the HBase timeline store ( YARN-2032 ), which is scalable and reliable. Thanks to the stateless nature, the proper way scale up the timeline service to start a number of federal timeline sever and connect them to the same HBase timeline store. This design will solve the high availability problem as well. Moreover, it’s not just scale the writing channel, but also the reading one. I’ll file a separate ticket about the scalable and highly available timeline server. 2. Direct-writing implementation: The biggest concern here is that the solution is cracking the current timeline server architecture: a) Accepting requests for users -> b) Pre-prosessing the timeline entities and verifying users’ access -> c) Transmitting the timeline entities into key/value pairs If you want to open the option for the client to write into the data store directly, all this logic has to be moved to the client. It’s a complex stack than some simple put calls to the data store. It’s not just increasing the more dependency for the timeline client, but changing it from a thin client to a fat one. This makes it more difficult to distribute the timeline client and upgrade it in the future, and makes the user heavily loaded. Importantly, I’m not sure we will be able to verify user's access at the client side, or HBase access control is enough for our specific timeline ACLs. Another concern is that the client is strongly coupled with a particular type of data store, such as HBase. If we choose to use Leveldb (or Rocksdb), the current client for HBase used at the application side is going to be broken. In other word, you have to change the client and the data store simultaneously. Misc. SLA of TimelineClient The proposal recalls me another interesting question: SLA of the timeline server and the client. The timeline server should be reliable: when an timeline entity is accepted at the timeline server, it should not be lost, which is ensured by reliable data store (e.g. HBase): YARN-2032 . It is questionable whether we also want the timeline client to reliable: when an timeline entity is passed to the timeline client, it should not be lost before being accepted by the timeline server, which may be in the outage for a while. Hence, local cache in HDFS may be a good idea. Or we can use an even lightweight solution: Leveldb. I’ll file a ticket about it as well.
          Hide
          Sangjin Lee added a comment -

          The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability.

          IMO, option (1) necessarily entails less frequent imports into the store by ATS. Obviously, if ATS still imports the HDFS files at the same speed as the timeline entries are generated, there would be no difference in scalability. This option would make sense only if the imports are less frequent. It also would mean that as a trade-off reads would be more stale. I believe Robert's document points out all those points.

          Regarding option (2), I think your point is valid that it would be a transition from a thin client to a fat client. And along with that would be some complications as you point out.

          However, I'm not too sure if it would make changing the data store much more complicated than other scenarios. I think the main problem of switching the data store is when not all writers are updated to point to the new data store. If writes are in progress, and the clients are being upgraded, there would be some inconsistencies between clients that were already upgraded and started writing to the new store and those that are not upgraded yet and still writing to the old store. If you have a single writer (such as the current ATS design), then it would be simpler. But then again, if we consider a scenario such as a cluster of ATS instances, the same problem exists there. I think that specific problem could be solved by holding the writes in some sort of a backup area (e.g. hdfs) before the switch starts, and recovering/re-enabling once all the writers are upgraded.

          The idea of a cluster of ATS instances (multiple write/read instances) sounds interesting. It might be able to address the scalability/reliability problem at hand. We'd need to think through and poke holes to see if the idea holds up well, however. It would need to address how load balancing would be done and whether it would be left up to the user, for example.

          Show
          Sangjin Lee added a comment - The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability. IMO, option (1) necessarily entails less frequent imports into the store by ATS. Obviously, if ATS still imports the HDFS files at the same speed as the timeline entries are generated, there would be no difference in scalability. This option would make sense only if the imports are less frequent. It also would mean that as a trade-off reads would be more stale. I believe Robert's document points out all those points. Regarding option (2), I think your point is valid that it would be a transition from a thin client to a fat client. And along with that would be some complications as you point out. However, I'm not too sure if it would make changing the data store much more complicated than other scenarios. I think the main problem of switching the data store is when not all writers are updated to point to the new data store. If writes are in progress, and the clients are being upgraded, there would be some inconsistencies between clients that were already upgraded and started writing to the new store and those that are not upgraded yet and still writing to the old store. If you have a single writer (such as the current ATS design), then it would be simpler. But then again, if we consider a scenario such as a cluster of ATS instances, the same problem exists there. I think that specific problem could be solved by holding the writes in some sort of a backup area (e.g. hdfs) before the switch starts, and recovering/re-enabling once all the writers are upgraded. The idea of a cluster of ATS instances (multiple write/read instances) sounds interesting. It might be able to address the scalability/reliability problem at hand. We'd need to think through and poke holes to see if the idea holds up well, however. It would need to address how load balancing would be done and whether it would be left up to the user, for example.
          Hide
          Zhijie Shen added a comment -

          Sangjin Lee, thanks for your feedback. Here're some additional thoughts and clarifications upon your comments.

          This option would make sense only if the imports are less frequent.

          To be more specific, I mean sending the same amount of entities (not too big, if too big HTTP REST request has to chunk them into some continuous HTTP requests with reasonable size) via HTTP REST or HDFS should perform similar. HTTP REST may be better because of less secondary storage I/O (ethernet should be fast than disk). HTTP REST doesn't prevent the user from batching the entities and put them once, and current API supports it. It's up to the user to put the entity immediately for realtime/near-realtime inquiry, or to batch entities if the can tolerant some delay.

          However, I agree HDFS or some other single-node storage technique is a interesting part to prevent losing the entities when they are not published to the timeline server yet, in particular when we batching them.

          Regarding option (2), I think your point is valid that it would be a transition from a thin client to a fat client.

          However, I'm not too sure if it would make changing the data store much more complicated than other scenarios.

          I'm also not very sure about the necessary changes. As what I mentioned before, timeline server doesn't simply put the entities into the data store. One immediate problem I can come up with is the authorization. I'm not sure it's going to be logically correct to check the user's access in the client at the user's side. If we move authorization to the data store, HBase supports access control, but Levedb seems not. And I'm not sure HBase access control is enough for timeline sever's specific logic. Still need to think more about it.

          As the client is growing fatter, it's difficult to maintain different versions of clients. For example, if we do some incompatible optimization for the storage schema, only the new client can write into it, while the old client will not work any more. Moreover, as most writing logic is conducted at user land, which is not predictable, it is likely to raise some unexpected failure than a well setup server. In general, I prefer to keep the client simple, such that the future client distribution and maintenance could be of less effort.

          But then again, if we consider a scenario such as a cluster of ATS instances, the same problem exists there.

          Right the same problem will exist at the server side, but the web front has isolated it from the users. Compared to the clients at the application, the ATS instances are a relatively small controllable set that we can pause them and do proper upgrading process. How do you think?

          Show
          Zhijie Shen added a comment - Sangjin Lee , thanks for your feedback. Here're some additional thoughts and clarifications upon your comments. This option would make sense only if the imports are less frequent. To be more specific, I mean sending the same amount of entities (not too big, if too big HTTP REST request has to chunk them into some continuous HTTP requests with reasonable size) via HTTP REST or HDFS should perform similar. HTTP REST may be better because of less secondary storage I/O (ethernet should be fast than disk). HTTP REST doesn't prevent the user from batching the entities and put them once, and current API supports it. It's up to the user to put the entity immediately for realtime/near-realtime inquiry, or to batch entities if the can tolerant some delay. However, I agree HDFS or some other single-node storage technique is a interesting part to prevent losing the entities when they are not published to the timeline server yet, in particular when we batching them. Regarding option (2), I think your point is valid that it would be a transition from a thin client to a fat client. However, I'm not too sure if it would make changing the data store much more complicated than other scenarios. I'm also not very sure about the necessary changes. As what I mentioned before, timeline server doesn't simply put the entities into the data store. One immediate problem I can come up with is the authorization. I'm not sure it's going to be logically correct to check the user's access in the client at the user's side. If we move authorization to the data store, HBase supports access control, but Levedb seems not. And I'm not sure HBase access control is enough for timeline sever's specific logic. Still need to think more about it. As the client is growing fatter, it's difficult to maintain different versions of clients. For example, if we do some incompatible optimization for the storage schema, only the new client can write into it, while the old client will not work any more. Moreover, as most writing logic is conducted at user land, which is not predictable, it is likely to raise some unexpected failure than a well setup server. In general, I prefer to keep the client simple, such that the future client distribution and maintenance could be of less effort. But then again, if we consider a scenario such as a cluster of ATS instances, the same problem exists there. Right the same problem will exist at the server side, but the web front has isolated it from the users. Compared to the clients at the application, the ATS instances are a relatively small controllable set that we can pause them and do proper upgrading process. How do you think?
          Hide
          bc Wong added a comment -

          The current writing channel allows the data to be available on the timeline server immediately

          Let's have reliability before speed. I think one of the requirement of ATS is: The channel for writing events should be reliable.

          I'm using reliable here in a strong sense, not the TCP-best-effort style reliability. HDFS is reliable. Kafka is reliable. (They are also scalable and robust.) A normal RPC connection is not. I don't want the ATS to be able to slow down my writes, and therefore, my applications, at all. For example, an ATS failover shouldn't pause all my applications for N seconds. A direct RPC to the ATS for writing seems a poor choice in general.

          Yes, you could make a distributed reliable scalable "ATS service" to accept writing events. But that seems a lot of work, while we can leverage existing technologies.

          If the channel itself is pluggable, then we have lots of options. Kafka is a very good choice, for sites that already deploy Kafka and know how to operate it. Using HDFS as a channel is also a good default implementation, for people already know how to scale and manage HDFS. Embedding a Kafka broker with each ATS daemon is also an option, if we're ok with that dependency.

          Show
          bc Wong added a comment - The current writing channel allows the data to be available on the timeline server immediately Let's have reliability before speed. I think one of the requirement of ATS is: The channel for writing events should be reliable. I'm using reliable here in a strong sense, not the TCP-best-effort style reliability. HDFS is reliable. Kafka is reliable. (They are also scalable and robust.) A normal RPC connection is not. I don't want the ATS to be able to slow down my writes, and therefore, my applications, at all. For example, an ATS failover shouldn't pause all my applications for N seconds. A direct RPC to the ATS for writing seems a poor choice in general. Yes, you could make a distributed reliable scalable "ATS service" to accept writing events. But that seems a lot of work, while we can leverage existing technologies. If the channel itself is pluggable, then we have lots of options. Kafka is a very good choice, for sites that already deploy Kafka and know how to operate it. Using HDFS as a channel is also a good default implementation, for people already know how to scale and manage HDFS. Embedding a Kafka broker with each ATS daemon is also an option, if we're ok with that dependency.
          Hide
          Zhijie Shen added a comment - - edited

          bc Wong, thanks for your interests in the timeline server and sharing your idea. Here’re some of my opinions and our previous rationales.

          Let's have reliability before speed. I think one of the requirement of ATS is: The channel for writing events should be reliable.

          I agree reliability is an important requirement of the timeline server, but the other requirements such as scalability and efficiency should be orthogonal to it, such that there’s no order of which should come first. We can pursue both enhancement, can’t we?

          I'm using reliable here in a strong sense, not the TCP-best-effort style reliability. HDFS is reliable. Kafka is reliable. (They are also scalable and robust.)

          IMHO, it may be unfair to compare the reliability between TCP and HDFS, Kafka, because they’re on the different layer of the communication stack. HDFS and Kafka are also built on top of TCP for communication, right? In my previous comments, I’ve mentioned that we need to clearly define *reliability, and I’d like to highlight it here again:

          1. Server is reliable: when timeline entities is passed to the timeline server, it should prevent them from being lost. After YARN-2032, we’re going to have HBase timeline store to ensure it.

          2. Client is reliable: once the timeline entities are hand over to the timeline client, before the timeline client successfully put in to the timeline sever, it should prevent them being lost at the client side. We may use some techniques to cache the entities locally. I opened YANR-2521 to track the dissuasion along this direction.

          Between client and server, TCP is the trustworthy protocol. If client gets ACK from server, we should be confident that the server already gets the entities.

          A normal RPC connection is not. I don't want the ATS to be able to slow down my writes, and therefore, my applications, at all.

          I’m not sure there's the direct relationship between reliability and nonblocking writing. For example, submitting app via YarnClient to HA RM is reliable, but the user is still likely to blocked until the app submission is responded. Whether writing events is blocking or non-blocking depends on how the user uses the client. In YARN-2033, I make RM put the entities on a separate thread to prevent blocking the dispatcher for managing YARN app lifecycle. And I can see that nonblocking writing is a useful optimization, such that I’ve opened YARN-2517 to implement TimelineClientAsync for general usage.

          Yes, you could make a distributed reliable scalable "ATS service" to accept writing events. But that seems a lot of work, while we can leverage existing technologies.

          AFAIK, the timeline server is a stateless machine, it should not be difficult to use Zookeeper to manage a number instances and writing to the same HBase cluster. We may need to pay attention to load balancing, and concurrent writing. I’m not sure it will really be a lot of work. Please let me know if I’ve neglected some important pieces. And in the scope of YARN, we already accumulated similar experience when making HA RM, and it turns out to be a practical solution. Again, this is about scalability, which is orthogonal to reliability. Even we pass the timeline entities via Kafka/HDFS to the timeline server, the single server is still going to be the bottleneck of processing a large volume of requests, no matter how big the Kafaka/HDFS cluster is.

          If the channel itself is pluggable, then we have lots of options. Kafka is a very good choice, for sites that already deploy Kafka and know how to operate it. Using HDFS as a channel is also a good default implementation, for people already know how to scale and manage HDFS.

          I’m not object to having different entity publishing channels, but my concern is the effort to maintain the timeline client is going to be folded per number of the channels. As the timeline server is going to to be long term project, we can not neglect the additional workload of evolving all channels. And this is the similar concern that we want to remove the FS-based history store (see YARN-2320). Maybe cooperatively improving the current channel is a more cost-efficient choice. It’s good to think more before opening a new channel.

          In addition, the default solution is good to be simple and self-contained. A heavy solution with complex configuration and and large dependency is likely to prolong the learning curve to keep new adopters away, and complicate fast, small-scale deployment.

          Show
          Zhijie Shen added a comment - - edited bc Wong , thanks for your interests in the timeline server and sharing your idea. Here’re some of my opinions and our previous rationales. Let's have reliability before speed. I think one of the requirement of ATS is: The channel for writing events should be reliable. I agree reliability is an important requirement of the timeline server, but the other requirements such as scalability and efficiency should be orthogonal to it, such that there’s no order of which should come first. We can pursue both enhancement, can’t we? I'm using reliable here in a strong sense, not the TCP-best-effort style reliability. HDFS is reliable. Kafka is reliable. (They are also scalable and robust.) IMHO, it may be unfair to compare the reliability between TCP and HDFS, Kafka, because they’re on the different layer of the communication stack. HDFS and Kafka are also built on top of TCP for communication, right? In my previous comments , I’ve mentioned that we need to clearly define *reliability, and I’d like to highlight it here again: 1. Server is reliable: when timeline entities is passed to the timeline server, it should prevent them from being lost. After YARN-2032 , we’re going to have HBase timeline store to ensure it. 2. Client is reliable: once the timeline entities are hand over to the timeline client, before the timeline client successfully put in to the timeline sever, it should prevent them being lost at the client side. We may use some techniques to cache the entities locally. I opened YANR-2521 to track the dissuasion along this direction. Between client and server, TCP is the trustworthy protocol. If client gets ACK from server, we should be confident that the server already gets the entities. A normal RPC connection is not. I don't want the ATS to be able to slow down my writes, and therefore, my applications, at all. I’m not sure there's the direct relationship between reliability and nonblocking writing. For example, submitting app via YarnClient to HA RM is reliable, but the user is still likely to blocked until the app submission is responded. Whether writing events is blocking or non-blocking depends on how the user uses the client. In YARN-2033 , I make RM put the entities on a separate thread to prevent blocking the dispatcher for managing YARN app lifecycle. And I can see that nonblocking writing is a useful optimization, such that I’ve opened YARN-2517 to implement TimelineClientAsync for general usage. Yes, you could make a distributed reliable scalable "ATS service" to accept writing events. But that seems a lot of work, while we can leverage existing technologies. AFAIK, the timeline server is a stateless machine, it should not be difficult to use Zookeeper to manage a number instances and writing to the same HBase cluster. We may need to pay attention to load balancing, and concurrent writing. I’m not sure it will really be a lot of work. Please let me know if I’ve neglected some important pieces. And in the scope of YARN, we already accumulated similar experience when making HA RM, and it turns out to be a practical solution. Again, this is about scalability, which is orthogonal to reliability. Even we pass the timeline entities via Kafka/HDFS to the timeline server, the single server is still going to be the bottleneck of processing a large volume of requests, no matter how big the Kafaka/HDFS cluster is. If the channel itself is pluggable, then we have lots of options. Kafka is a very good choice, for sites that already deploy Kafka and know how to operate it. Using HDFS as a channel is also a good default implementation, for people already know how to scale and manage HDFS. I’m not object to having different entity publishing channels, but my concern is the effort to maintain the timeline client is going to be folded per number of the channels. As the timeline server is going to to be long term project, we can not neglect the additional workload of evolving all channels. And this is the similar concern that we want to remove the FS-based history store (see YARN-2320 ). Maybe cooperatively improving the current channel is a more cost-efficient choice. It’s good to think more before opening a new channel. In addition, the default solution is good to be simple and self-contained. A heavy solution with complex configuration and and large dependency is likely to prolong the learning curve to keep new adopters away, and complicate fast, small-scale deployment.
          Hide
          bc Wong added a comment -

          Hi Zhijie Shen. My main concern with the write path is: Does the ATS write path have the right reliability, robustness and scalability so that its failures would not affect my apps? I'll try to explain it with specific scenarios and technology choices. Then maybe you can tell me if those are valid concerns.

          First, to make it easy for other readers here, I'm advocating that this event flow:
          Client/App -> Reliable channel where event is persisted (HDFS/Kafka) -> ATS
          is a lot better than:
          Client/App -> RPC -> ATS

          Scenario 1. ATS service goes down

          If we use a reliable channel (e.g. HDFS) for writes, then apps do not suffer at all even when the ATS goes down. The ATS service going down is a valid scenario, due to causes ranging from bug to hardware failures. Having the write path decoupled from the ATS service being up all the time seems a clear win to me. Writing decoupled components is also a good distributed systems design principle.

          On the other hand, one may argue that the ATS service will never go down entirely, or is not supposed to go down entirely, just like we don't expect all the ZK nodes or all the RM nodes to down down. That argument then justifies using direct RPC for writes. Yes, you can design such an ATS service. To this I'll say:

          • YARN apps already depend on ZK/RM/HDFS being up. Every new service dependency we add will only increase the chances of YARN apps failing or slowing down. That's true even if the ATS service's uptime is as good as ZK or RM.
          • Realistically, getting the ATS service's uptime to the same level as ZK or HDFS is a long and winding road. Especially when most discussions here assume HBase as the backing store. HBase's uptime is lower than HDFS/ZK/RM because it's more complex to operate. If HBase going down means ATS service going down, then we certainly should guard against this failure scenario.

          Scenario 2. ATS service partially down

          If the client writes directly to the ATS service using an unreliable channel (RPC), then the write path will do failover if one of ATS nodes fails. This transient failure still affects the performance of YARN apps. One can argue that non-blocking RPC writes resolve this issue. To this I'll say:

          • Non-blocking RPC writes only works for long-duration apps. We already short-lived applications, in the range of a few minutes. With Spark getting more popular, this will continue to happen. How short will the app duration get? The answer is a few seconds, if we want YARN to be the generic cluster scheduler. Google already sees that kind of job profile, if you look at their cluster traces. Of course, our scheduler and container allocation needs to get a lot better for that to happen. But I think that's the goal. Our ATS design here should consider short-lived applications.
          • It sucks if you're running an app that's supposed to finish under a minute, but then the ATS writes are stalled for an extra minute because one ATS node does a failover. Again, we can go back to the counter-argument in scenario #1, about how unlikely this is. I'll repeat that it's more likely that we think. And if we have a choice to decouple the write path from the ATS service, why not?

          Scenario 3. ATS backing store fails

          By backing store, I mean the storage system where ATS persists the events, such as LevelDB and HBase. In a naive implementation, it seems that if the backing store fails, then the ATS service will be unavailable. Does that mean the event write path will fail, and the YARN apps will stall or fail? I hope not. It's not an issue if we use HDFS as the default write channel, because most YARN apps already depends on HDFS.

          One may argue that the ATS service will buffer writes (persist them elsewhere) if the backing store fails. To this I'll say:

          • If we have an alternate code path to persist events first before they hit the final backing store, why not do that all the time? Such a path will address scenario #1 and #2 as well.
          • HBase has been mentioned as if it's the penicillin of event storage here. That is probably true for big shops like Twitter and Yahoo, who have the expertise to operate an HBase cluster well. But most enterprise users or startups don't. We should assume that those HBase instances will run suboptimally with occasional widespread failures. Using HBase for event storage is a poor fit for most people. And I think it's difficult to achieve good uptime for the ATS service as a whole.
          Show
          bc Wong added a comment - Hi Zhijie Shen . My main concern with the write path is: Does the ATS write path have the right reliability, robustness and scalability so that its failures would not affect my apps? I'll try to explain it with specific scenarios and technology choices. Then maybe you can tell me if those are valid concerns. First, to make it easy for other readers here, I'm advocating that this event flow: Client/App -> Reliable channel where event is persisted (HDFS/Kafka) -> ATS is a lot better than: Client/App -> RPC -> ATS Scenario 1. ATS service goes down If we use a reliable channel (e.g. HDFS) for writes, then apps do not suffer at all even when the ATS goes down. The ATS service going down is a valid scenario, due to causes ranging from bug to hardware failures. Having the write path decoupled from the ATS service being up all the time seems a clear win to me. Writing decoupled components is also a good distributed systems design principle. On the other hand, one may argue that the ATS service will never go down entirely, or is not supposed to go down entirely , just like we don't expect all the ZK nodes or all the RM nodes to down down. That argument then justifies using direct RPC for writes. Yes, you can design such an ATS service. To this I'll say: YARN apps already depend on ZK/RM/HDFS being up. Every new service dependency we add will only increase the chances of YARN apps failing or slowing down. That's true even if the ATS service's uptime is as good as ZK or RM. Realistically, getting the ATS service's uptime to the same level as ZK or HDFS is a long and winding road. Especially when most discussions here assume HBase as the backing store. HBase's uptime is lower than HDFS/ZK/RM because it's more complex to operate. If HBase going down means ATS service going down, then we certainly should guard against this failure scenario. Scenario 2. ATS service partially down If the client writes directly to the ATS service using an unreliable channel (RPC), then the write path will do failover if one of ATS nodes fails. This transient failure still affects the performance of YARN apps. One can argue that non-blocking RPC writes resolve this issue . To this I'll say: Non-blocking RPC writes only works for long-duration apps . We already short-lived applications, in the range of a few minutes. With Spark getting more popular, this will continue to happen. How short will the app duration get? The answer is a few seconds, if we want YARN to be the generic cluster scheduler. Google already sees that kind of job profile, if you look at their cluster traces. Of course, our scheduler and container allocation needs to get a lot better for that to happen. But I think that's the goal. Our ATS design here should consider short-lived applications. It sucks if you're running an app that's supposed to finish under a minute, but then the ATS writes are stalled for an extra minute because one ATS node does a failover. Again, we can go back to the counter-argument in scenario #1, about how unlikely this is. I'll repeat that it's more likely that we think. And if we have a choice to decouple the write path from the ATS service, why not? Scenario 3. ATS backing store fails By backing store, I mean the storage system where ATS persists the events, such as LevelDB and HBase. In a naive implementation, it seems that if the backing store fails, then the ATS service will be unavailable. Does that mean the event write path will fail, and the YARN apps will stall or fail? I hope not. It's not an issue if we use HDFS as the default write channel, because most YARN apps already depends on HDFS. One may argue that the ATS service will buffer writes (persist them elsewhere) if the backing store fails . To this I'll say: If we have an alternate code path to persist events first before they hit the final backing store, why not do that all the time? Such a path will address scenario #1 and #2 as well. HBase has been mentioned as if it's the penicillin of event storage here. That is probably true for big shops like Twitter and Yahoo, who have the expertise to operate an HBase cluster well. But most enterprise users or startups don't. We should assume that those HBase instances will run suboptimally with occasional widespread failures. Using HBase for event storage is a poor fit for most people. And I think it's difficult to achieve good uptime for the ATS service as a whole.
          Hide
          eric baldeschwieler added a comment -

          Makes sense to me.
          Thanks,

          Eric14
          — a.k.a. Eric Baldeschwieler

          Show
          eric baldeschwieler added a comment - Makes sense to me. Thanks, Eric14 — a.k.a. Eric Baldeschwieler
          Hide
          Maysam Yabandeh added a comment -

          YARN apps already depend on ZK/RM/HDFS being up. Every new service dependency we add will only increase the chances of YARN apps failing or slowing down. That's true even if the ATS service's uptime is as good as ZK or RM.

          Realistically, getting the ATS service's uptime to the same level as ZK or HDFS is a long and winding road. Especially when most discussions here assume HBase as the backing store. HBase's uptime is lower than HDFS/ZK/RM because it's more complex to operate. If HBase going down means ATS service going down, then we certainly should guard against this failure scenario.

          +1

          And if we have a choice to decouple the write path from the ATS service, why not?

          If we have an alternate code path to persist events first before they hit the final backing store, why not do that all the time?

          I would call that a reasonable approach. One alternative also is to use HDFS as the backup plan, i.e., use it when HBase is down. Anyway, having ATS pluggable I guess all approaches can grow independently.

          Show
          Maysam Yabandeh added a comment - YARN apps already depend on ZK/RM/HDFS being up. Every new service dependency we add will only increase the chances of YARN apps failing or slowing down. That's true even if the ATS service's uptime is as good as ZK or RM. Realistically, getting the ATS service's uptime to the same level as ZK or HDFS is a long and winding road. Especially when most discussions here assume HBase as the backing store. HBase's uptime is lower than HDFS/ZK/RM because it's more complex to operate. If HBase going down means ATS service going down, then we certainly should guard against this failure scenario. +1 And if we have a choice to decouple the write path from the ATS service, why not? If we have an alternate code path to persist events first before they hit the final backing store, why not do that all the time? I would call that a reasonable approach. One alternative also is to use HDFS as the backup plan, i.e., use it when HBase is down. Anyway, having ATS pluggable I guess all approaches can grow independently.
          Hide
          Robert Kanter added a comment -

          I also agree that providing reliability through an “always-up” ATS service is not the optimal solution here for the reasons already mentioned. We should instead make the write path and backing store reliable (or at least somehow recoverable).


          Though each application can write the timeline entities into HDFS in a distributed manner, there’s still a single timeline server that fetches the files of the timeline entities written by ALL applications. The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability.

          Technically yes, there is still the same bottleneck. However, with the HDFS channel, the ATS can essentially throttle the events Suppose you have a cluster pushing X events/second to the ATS. With the REST implementation, the ATS must try to handle X events every second; if it can’t keep up, or if it gets too many incoming connections, there’s not too much we can do here. I suppose we could add active-active HA so we have more ATS servers running, but I’m not sure we want to make that a requirement — we’d also have to come up with a good way of balancing this. With the HDFS implementation, the ATS has more control over how it ingests the events: for example, it could read a maximum of Y events per poll, or Y events per job, etc. While this will slow down the availability of the events in the ATS, it will allow it to keep running normally and not require active-active HA. And if we make this configurable enough, users with beefier ATS machines could increase Y.

          It sounds like there are two areas where we’re having difficulty coming to a consensus:

          1. The write path/communication channel from the TimelineClient to the ATS or backing store
          2. The backing store itself

          I can see reasons for having different implementations for the backing store given that HBase is a “heavy” external service and we should have something that works out-of-the-box. Ideally, I think it would be best if we could all agree on a single write path, though making it pluggable is certainly an option. As for maintaining them, I think we should be fine as long as we don’t have too many implementations. We already do that for other components, such as the scheduler; though we should be careful to make sure that the different implementations only implement what they need to and any shareable code is shared. In making the write path pluggable, we’d have to have two pieces: one to do the writing from the TimelineClient and one to the receiving in the ATS. These would have to be in pairs. We’ve already discussed some different implementations for this: REST, Kafka, and HDFS.

          The backing store is already pluggable. Though as bc pointed out before, it’s fine for more experienced users to use HBase, but “regular” users should have a solution as well that is hopefully more scalable and reliable than LevelDB. It would be great if we could provide a backing store that’s in between LevelDB and HBase. And I think it’s fine to be an external to Hadoop as long as it’s relatively simple to setup and maintain. Though I’ll admit I’m not really sure of such a store we could use. Does anyone have any suggestions on this?

          Show
          Robert Kanter added a comment - I also agree that providing reliability through an “always-up” ATS service is not the optimal solution here for the reasons already mentioned. We should instead make the write path and backing store reliable (or at least somehow recoverable). Though each application can write the timeline entities into HDFS in a distributed manner, there’s still a single timeline server that fetches the files of the timeline entities written by ALL applications. The bottleneck is still there. Essentially I don’t see any difference between publishing entities via HTTP REST interface and via HDFS in terms of scalability. Technically yes, there is still the same bottleneck. However, with the HDFS channel, the ATS can essentially throttle the events Suppose you have a cluster pushing X events/second to the ATS. With the REST implementation, the ATS must try to handle X events every second; if it can’t keep up, or if it gets too many incoming connections, there’s not too much we can do here. I suppose we could add active-active HA so we have more ATS servers running, but I’m not sure we want to make that a requirement — we’d also have to come up with a good way of balancing this. With the HDFS implementation, the ATS has more control over how it ingests the events: for example, it could read a maximum of Y events per poll, or Y events per job, etc. While this will slow down the availability of the events in the ATS, it will allow it to keep running normally and not require active-active HA. And if we make this configurable enough, users with beefier ATS machines could increase Y. It sounds like there are two areas where we’re having difficulty coming to a consensus: The write path/communication channel from the TimelineClient to the ATS or backing store The backing store itself I can see reasons for having different implementations for the backing store given that HBase is a “heavy” external service and we should have something that works out-of-the-box. Ideally, I think it would be best if we could all agree on a single write path, though making it pluggable is certainly an option. As for maintaining them, I think we should be fine as long as we don’t have too many implementations. We already do that for other components, such as the scheduler; though we should be careful to make sure that the different implementations only implement what they need to and any shareable code is shared. In making the write path pluggable, we’d have to have two pieces: one to do the writing from the TimelineClient and one to the receiving in the ATS. These would have to be in pairs. We’ve already discussed some different implementations for this: REST, Kafka, and HDFS. The backing store is already pluggable. Though as bc pointed out before, it’s fine for more experienced users to use HBase, but “regular” users should have a solution as well that is hopefully more scalable and reliable than LevelDB. It would be great if we could provide a backing store that’s in between LevelDB and HBase. And I think it’s fine to be an external to Hadoop as long as it’s relatively simple to setup and maintain. Though I’ll admit I’m not really sure of such a store we could use. Does anyone have any suggestions on this?
          Hide
          Zhijie Shen added a comment -

          Scenario 1. ATS service goes down

          Scenario 2. ATS service partially down

          In general, I agree the concerns about the scenario when the timeline server is (partially) down makes sense. However, if we change the subject from ATS to HDFS/Kafka, I'm afraid we can get the similar conclusion. For example, HDFS is temporally not writable (We actually have observed this issue around YARN log aggregation). I can see the judgement has a obvious implication that the timeline server can be down, but HDFS/Kafka will not. It's correct to some extent base on the current timeline server SLA. Therefore, is making the timeline server reliable (or always-up) the essential solution? If the timeline server is reliable, it's going to relax the requirement to persist entities in a third place (this is the basic benefit I can see with HDFS/Kafka channel). While it may take a while to make sure the timeline server be as reliable as HDFS/Kafka does, we can make progress step by step, for example, YARN-2520 should realistic to be achieved within a reasonable timeline.

          Of course, there may still be a reliability gap between ATS/HBase and HDFS/Kafka (Actually, I'm not experienced about the reliability about the latter components, please let me know the exact gap it will be). It could be arguable that we still need to persist the entities in HDFS/Kafka when ATS/HBase is not available but HDFS/Kafka is still available. However, if we anyway need to improve the timeline server reliability, perhaps we should think carefully of the cost performance of implementing and maintaing another writing channel to bridge the gap.

          Scenario 3. ATS backing store fails

          In this scenario, the entities have already reached the timeline server, right? I'm considering it as the internal reliability problem of the timeline server. As I mentioned the previous threads, it's the requirement that if the entity has reached the timeline server: the timeline server should take the responsibility to prevent if from being lost. I think it's a good point that the date store is going to be in outage (as HDFS can be temporally not writable). Having local backup for those outstanding received requests should be an answer for this scenario.

          However, with the HDFS channel, the ATS can essentially throttle the events Suppose you have a cluster pushing X events/second to the ATS. With the REST implementation, the ATS must try to handle X events every second; if it can’t keep up, or if it gets too many incoming connections, there’s not too much we can do here.

          This may not be the accurate judgement. I'm supposing you are comparing pushing each event in on request for REST API with writing a batch of X events into HDFS. REST API allows to to batch X events and send one request. Please refer to TimelineClient#putEntities for the details.

          In making the write path pluggable, we’d have to have two pieces: one to do the writing from the TimelineClient and one to the receiving in the ATS. These would have to be in pairs. We’ve already discussed some different implementations for this: REST, Kafka, and HDFS.

          The backing store is already pluggable.

          No problem, it's feasible to make the write path pluggable. However. though the store is pluggable, Leveldb an HBase is relatively similar to each compared HTTP REST vs HDFS/Kafka pair. The more important thing is that it's more difficult to manage different writing channels than to manage different stores, because one is client-side and the other is server-side. At server-side, the YARN cluster operator has the full control of the servers, and the limited hosts to deal with. At client-side, the YARN cluster operator may not have the access to it, and don't know how many clients and how many type of apps he/she needs to deal with. TimelineClient is a generic tool (not for a particular application such as Spark), such that it's good to make it lightweight and portable. Again, it's still a cost performance question.

          Though as bc pointed out before, it’s fine for more experienced users to use HBase, but “regular” users should have a solution as well that is hopefully more scalable and reliable than LevelDB.

          Right, and this is also my concern about HDFS/Kafka channel, in particularly using it as a default. "Regular" users may not be experienced enough about HBase as well as HDFS/Kafka. It very much depends on the users and the use cases.

          bc Wong and Robert Kanter, thanks for putting new idea into the timeline service. In general, the timeline service is still a young project. We have different problems to solve and multiple ways to them. Additional writing channel is interesting, while given the whole roadmap of the timeline service, let's think critically of work that can improve the timeline service most significantly. Hopefully you can understand my concern. Thanks!

          Show
          Zhijie Shen added a comment - Scenario 1. ATS service goes down Scenario 2. ATS service partially down In general, I agree the concerns about the scenario when the timeline server is (partially) down makes sense. However, if we change the subject from ATS to HDFS/Kafka, I'm afraid we can get the similar conclusion. For example, HDFS is temporally not writable (We actually have observed this issue around YARN log aggregation). I can see the judgement has a obvious implication that the timeline server can be down, but HDFS/Kafka will not. It's correct to some extent base on the current timeline server SLA. Therefore, is making the timeline server reliable (or always-up) the essential solution? If the timeline server is reliable, it's going to relax the requirement to persist entities in a third place (this is the basic benefit I can see with HDFS/Kafka channel). While it may take a while to make sure the timeline server be as reliable as HDFS/Kafka does, we can make progress step by step, for example, YARN-2520 should realistic to be achieved within a reasonable timeline. Of course, there may still be a reliability gap between ATS/HBase and HDFS/Kafka (Actually, I'm not experienced about the reliability about the latter components, please let me know the exact gap it will be). It could be arguable that we still need to persist the entities in HDFS/Kafka when ATS/HBase is not available but HDFS/Kafka is still available. However, if we anyway need to improve the timeline server reliability, perhaps we should think carefully of the cost performance of implementing and maintaing another writing channel to bridge the gap. Scenario 3. ATS backing store fails In this scenario, the entities have already reached the timeline server, right? I'm considering it as the internal reliability problem of the timeline server. As I mentioned the previous threads, it's the requirement that if the entity has reached the timeline server: the timeline server should take the responsibility to prevent if from being lost. I think it's a good point that the date store is going to be in outage (as HDFS can be temporally not writable). Having local backup for those outstanding received requests should be an answer for this scenario. However, with the HDFS channel, the ATS can essentially throttle the events Suppose you have a cluster pushing X events/second to the ATS. With the REST implementation, the ATS must try to handle X events every second; if it can’t keep up, or if it gets too many incoming connections, there’s not too much we can do here. This may not be the accurate judgement. I'm supposing you are comparing pushing each event in on request for REST API with writing a batch of X events into HDFS. REST API allows to to batch X events and send one request. Please refer to TimelineClient#putEntities for the details. In making the write path pluggable, we’d have to have two pieces: one to do the writing from the TimelineClient and one to the receiving in the ATS. These would have to be in pairs. We’ve already discussed some different implementations for this: REST, Kafka, and HDFS. The backing store is already pluggable. No problem, it's feasible to make the write path pluggable. However. though the store is pluggable, Leveldb an HBase is relatively similar to each compared HTTP REST vs HDFS/Kafka pair. The more important thing is that it's more difficult to manage different writing channels than to manage different stores, because one is client-side and the other is server-side. At server-side, the YARN cluster operator has the full control of the servers, and the limited hosts to deal with. At client-side, the YARN cluster operator may not have the access to it, and don't know how many clients and how many type of apps he/she needs to deal with. TimelineClient is a generic tool (not for a particular application such as Spark), such that it's good to make it lightweight and portable. Again, it's still a cost performance question. Though as bc pointed out before, it’s fine for more experienced users to use HBase, but “regular” users should have a solution as well that is hopefully more scalable and reliable than LevelDB. Right, and this is also my concern about HDFS/Kafka channel, in particularly using it as a default. "Regular" users may not be experienced enough about HBase as well as HDFS/Kafka. It very much depends on the users and the use cases. bc Wong and Robert Kanter , thanks for putting new idea into the timeline service. In general, the timeline service is still a young project. We have different problems to solve and multiple ways to them. Additional writing channel is interesting, while given the whole roadmap of the timeline service, let's think critically of work that can improve the timeline service most significantly. Hopefully you can understand my concern. Thanks!
          Hide
          bc Wong added a comment -

          Hi Zhijie Shen. First, glad to see that we're discussing approaches. You seem to agree with the premise that ATS write path should not slow down apps.

          Therefore, is making the timeline server reliable (or always-up) the essential solution? If the timeline server is reliable, ...

          In theory, you can make the ATS always-up. In practice, we both know what real life distributed systems do. "Always-up" isn't the only thing. The write path needs to have good uptime and latency regardless of what's happening to the read path or the backing store.

          HDFS is a good default for the write channel because:

          • We don't have to design an ATS that is always-up. If you really want to, I'm sure you can eventually build something with good uptime. But it took other projects (HDFS, ZK) lots of hard work to get to that point.
          • If we reuse HDFS, cluster admins know how to operate HDFS and get good uptime from it. But it'll take training and hard-learned lessons for operators to figure out how to get good uptime from ATS, even after you build an always-up ATS.
          • All the popular YARN app frameworks (MR, Spark, etc.) already rely on HDFS by default. So do most of the 3rd party applications that I know of. Architecturally, it seems easier for admins to accept that ATS write path depends on HDFS for reliability, instead of a new component that (we claim) will be as reliable as HDFS/ZK.

          given the whole roadmap of the timeline service, let's think critically of work that can improve the timeline service most significantly.

          Exactly. Strong +1. If we can drop the high uptime + low write latency requirement from the ATS service, we can avoid tons of effort. ATS doesn't need to be as reliable as HDFS. We don't need to worry about insulating the write path from the read path. We don't need to worry about occasional hiccups in HBase (or whatever the store is). And at the end of all this, everybody sleeps better because "ATS service going down" isn't a catastrophic failure.

          Show
          bc Wong added a comment - Hi Zhijie Shen . First, glad to see that we're discussing approaches. You seem to agree with the premise that ATS write path should not slow down apps . Therefore, is making the timeline server reliable (or always-up) the essential solution? If the timeline server is reliable, ... In theory, you can make the ATS always-up . In practice, we both know what real life distributed systems do. "Always-up" isn't the only thing. The write path needs to have good uptime and latency regardless of what's happening to the read path or the backing store. HDFS is a good default for the write channel because: We don't have to design an ATS that is always-up. If you really want to, I'm sure you can eventually build something with good uptime. But it took other projects (HDFS, ZK) lots of hard work to get to that point. If we reuse HDFS, cluster admins know how to operate HDFS and get good uptime from it. But it'll take training and hard-learned lessons for operators to figure out how to get good uptime from ATS, even after you build an always-up ATS. All the popular YARN app frameworks (MR, Spark, etc.) already rely on HDFS by default. So do most of the 3rd party applications that I know of. Architecturally, it seems easier for admins to accept that ATS write path depends on HDFS for reliability, instead of a new component that (we claim) will be as reliable as HDFS/ZK. given the whole roadmap of the timeline service, let's think critically of work that can improve the timeline service most significantly. Exactly. Strong +1. If we can drop the high uptime + low write latency requirement from the ATS service, we can avoid tons of effort. ATS doesn't need to be as reliable as HDFS. We don't need to worry about insulating the write path from the read path. We don't need to worry about occasional hiccups in HBase (or whatever the store is). And at the end of all this, everybody sleeps better because "ATS service going down" isn't a catastrophic failure.
          Hide
          Zhijie Shen added a comment -

          Hi, bc Wong. Thanks for your further comments.

          You seem to agree with the premise that ATS write path should not slow down apps.

          Definitely. The arguable point is that the current timeline client is going to slow down the app, given we have a scalable and reliable timeline server.

          If we can drop the high uptime + low write latency requirement from the ATS service, we can avoid tons of effort.

          I'm not sure such fundamental requirements can be dropped from the timeline service. Projecting the future, scalable and high available timeline servers have multiple benefits and enable different use cases. For example,

          1. We can use it to serve realtime or near realtime data, such that we can go the timeline server to see what happens to an application. It's in particularly useful for the long running services, which will never turn down.

          2. We can build checkpoints on the timeline server for the app do to recovery once it crashes. It's pretty much like what we've done for MR jobs.

          I bundled "scalable" and "reliable" together because multiple-instance solution will improve the timeline server in both dimensions.

          Moreover, no matter how scalable and reliable the channel could be, we eventually want to get the timeline data accommodated into the timeline server, right? Otherwise, it is not going to be accessible by users (Of course, tricks can be played to fetch it directly from HDFS, but it's completely another story than the timeline server). If the apps are publishing 10GB data per hour, while the server can only process 1G per hour, the 9GB outstanding data per hour that resides in some temp location of HDFS is going to be useless writes.

          We have narrow down very much to discuss the reliability of the write path, but if we look into the big picture, the timeline server is not just place to store data, but also serves it to users (e.g., YARN-2513). In terms of use case, users may want to monitor completed apps as well as running apps and cluster. If the timeline server doesn't have capacity to serve the data for a particular use case, it's actually wasting the cost on aggregating it. IMHO, the scalable and the reliable timeline server is going to be the eventual solution to satisfy multiple stakeholders, regardless the use case is read intensive, write intensive or both intensive. That's why I think it could a high margin work to improve the timeline server. It's may be a hard work, but we should definitely pick it up.

          Show
          Zhijie Shen added a comment - Hi, bc Wong . Thanks for your further comments. You seem to agree with the premise that ATS write path should not slow down apps. Definitely. The arguable point is that the current timeline client is going to slow down the app, given we have a scalable and reliable timeline server. If we can drop the high uptime + low write latency requirement from the ATS service, we can avoid tons of effort. I'm not sure such fundamental requirements can be dropped from the timeline service. Projecting the future, scalable and high available timeline servers have multiple benefits and enable different use cases. For example, 1. We can use it to serve realtime or near realtime data, such that we can go the timeline server to see what happens to an application. It's in particularly useful for the long running services, which will never turn down. 2. We can build checkpoints on the timeline server for the app do to recovery once it crashes. It's pretty much like what we've done for MR jobs. I bundled "scalable" and "reliable" together because multiple-instance solution will improve the timeline server in both dimensions. Moreover, no matter how scalable and reliable the channel could be, we eventually want to get the timeline data accommodated into the timeline server, right? Otherwise, it is not going to be accessible by users (Of course, tricks can be played to fetch it directly from HDFS, but it's completely another story than the timeline server). If the apps are publishing 10GB data per hour, while the server can only process 1G per hour, the 9GB outstanding data per hour that resides in some temp location of HDFS is going to be useless writes. We have narrow down very much to discuss the reliability of the write path, but if we look into the big picture, the timeline server is not just place to store data, but also serves it to users (e.g., YARN-2513 ). In terms of use case, users may want to monitor completed apps as well as running apps and cluster. If the timeline server doesn't have capacity to serve the data for a particular use case, it's actually wasting the cost on aggregating it. IMHO, the scalable and the reliable timeline server is going to be the eventual solution to satisfy multiple stakeholders , regardless the use case is read intensive, write intensive or both intensive. That's why I think it could a high margin work to improve the timeline server. It's may be a hard work, but we should definitely pick it up.
          Mit Desai made changes -
          Assignee Mit Desai [ mitdesai ]
          Mit Desai made changes -
          Assignee Mit Desai [ mitdesai ]
          Sangjin Lee made changes -
          Link This issue relates to YARN-2928 [ YARN-2928 ]
          Steve Loughran made changes -
          Link This issue is related to HADOOP-11826 [ HADOOP-11826 ]
          Hide
          Zhijie Shen added a comment -

          Timeline service v1 is almost done. Most functionality has been committed through multiple versions, but mostly completed before 2.6. There're still a few outstanding issues, which are kept open for further discussion.

          Show
          Zhijie Shen added a comment - Timeline service v1 is almost done. Most functionality has been committed through multiple versions, but mostly completed before 2.6. There're still a few outstanding issues, which are kept open for further discussion.
          Zhijie Shen made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Steve Loughran made changes -
          Link This issue is related to SPARK-1537 [ SPARK-1537 ]
          Transition Time In Source Status Execution Times Last Executer Last Execution Date
          Open Open Resolved Resolved
          493d 23h 23m 1 Zhijie Shen 01/May/15 23:52

            People

            • Assignee:
              Unassigned
              Reporter:
              Vinod Kumar Vavilapalli
            • Votes:
              0 Vote for this issue
              Watchers:
              69 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development