Hadoop Common
  1. Hadoop Common
  2. HADOOP-5018

Chukwa should support pipelined writers

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.21.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Chukwa supports pipelined writers for improved extensibility.

      Description

      We ought to support chaining together writers; this will radically increase flexibility and make it practical to add new features without major surgery by putting them in pass-through or filter classes.

      1. pipeline4.patch
        24 kB
        Ari Rabkin
      2. pipeline3.patch
        24 kB
        Ari Rabkin
      3. pipeline2.patch
        24 kB
        Ari Rabkin
      4. pipeline.patch
        19 kB
        Ari Rabkin

        Issue Links

          Activity

          Hide
          Ari Rabkin added a comment -

          Fairly major surgery on the ChukwaWriter and ServletCollector classes in order to support dynamic creation of a writer pipeline. Adds some test code.

          Show
          Ari Rabkin added a comment - Fairly major surgery on the ChukwaWriter and ServletCollector classes in order to support dynamic creation of a writer pipeline. Adds some test code.
          Hide
          Ari Rabkin added a comment -

          This will be very useful for Berkeley since want to do near-real-time collection, which we can do in a pipeline stage.

          Show
          Ari Rabkin added a comment - This will be very useful for Berkeley since want to do near-real-time collection, which we can do in a pipeline stage.
          Hide
          Jerome Boulon added a comment -

          Hi Ari,
          I just want let you know that I'm planning to remove the HDFS dependency,
          1) The collector will first write to the local file system and then 2) the data will be pushed to a pub/sub framework to be used by real time components.
          Later on the data will be moved to HDFS in a background thread or process.

          Why 1 and 2

          1) because people may want to only use chukwa to collect their data without any Hadoop dependency
          2) to easily be able to extends Chukwa just by listening to an event.

          The pub/sub framework will allow to filter by dataType and tags like source/cluster for example

          I also want to solve the duplicate removal problem for chunks at the collector level.

          Show
          Jerome Boulon added a comment - Hi Ari, I just want let you know that I'm planning to remove the HDFS dependency, 1) The collector will first write to the local file system and then 2) the data will be pushed to a pub/sub framework to be used by real time components. Later on the data will be moved to HDFS in a background thread or process. Why 1 and 2 1) because people may want to only use chukwa to collect their data without any Hadoop dependency 2) to easily be able to extends Chukwa just by listening to an event. The pub/sub framework will allow to filter by dataType and tags like source/cluster for example I also want to solve the duplicate removal problem for chunks at the collector level.
          Hide
          Ari Rabkin added a comment -

          Both 1 and 2 are worthy goals. I think that pipelines are a fairly natural way to accomplish both. I intended to write a pipeline stage for doing subscriptions for real-time delivery; if you're also working on that, it's pretty awesome, and we should open a JIRA.

          I hadn't thought of log-to-local-storage, but it should be easy to write a pipeline stage that stores everything, passes it through, and that also has a worker thread that does the write to HDFS.

          What do you mean by removing the Hadoop dependency? I assume you don't literally mean breaking all dependence on Hadoop-core. But you can already point the SeqFileWriter at a local filesystem; you don't need an HDFS cluster.

          Show
          Ari Rabkin added a comment - Both 1 and 2 are worthy goals. I think that pipelines are a fairly natural way to accomplish both. I intended to write a pipeline stage for doing subscriptions for real-time delivery; if you're also working on that, it's pretty awesome, and we should open a JIRA. I hadn't thought of log-to-local-storage, but it should be easy to write a pipeline stage that stores everything, passes it through, and that also has a worker thread that does the write to HDFS. What do you mean by removing the Hadoop dependency? I assume you don't literally mean breaking all dependence on Hadoop-core. But you can already point the SeqFileWriter at a local filesystem; you don't need an HDFS cluster.
          Hide
          Jerome Boulon added a comment -

          >>What do you mean by removing the Hadoop dependency?
          The collector should not require an HDFS system but can use and/or take advantage of hadoop-core but this will be pipeline dependent.

          Show
          Jerome Boulon added a comment - >>What do you mean by removing the Hadoop dependency? The collector should not require an HDFS system but can use and/or take advantage of hadoop-core but this will be pipeline dependent.
          Hide
          Ari Rabkin added a comment - - edited

          The sequencefilewriter doesn't require an HDFS system. You can point it at a local filesystem and it'll work fine.

          Show
          Ari Rabkin added a comment - - edited The sequencefilewriter doesn't require an HDFS system. You can point it at a local filesystem and it'll work fine.
          Hide
          Eric Yang added a comment -

          Wouldn't it be better to decouple the pipeline logic from ServletCollector? It may be better to have a interface between ServletCollector and Pipeline logic. Hence, the pipline logic can be implemented as synchronized stages or paralleled stages for different use cases. i.e duplication data filtering, or real time monitoring alerts.

          Show
          Eric Yang added a comment - Wouldn't it be better to decouple the pipeline logic from ServletCollector? It may be better to have a interface between ServletCollector and Pipeline logic. Hence, the pipline logic can be implemented as synchronized stages or paralleled stages for different use cases. i.e duplication data filtering, or real time monitoring alerts.
          Hide
          Jerome Boulon added a comment - - edited

          My point is hat the pipelineWriter should be an implementation of the ChukwaWriter interface and that's really the only thing that the collector should be aware of.
          So to be able to do what you want:

          1) The collector should instantiate one writer implementation based on his configuration
          2) The writer should be able to get the collector configuration from somewhere (current design) or should have an init method with a Configuration parameter
          3) The contract from the collector point of view is the same call one method on the writer class and the result is success if there's no exception

          the delta with your implementation is:

          • Remove code from if (conf.get("chukwaCollector.pipeline") != null) ..
          • Replace by something like:

          writerClassName = conf.get("chukwaCollector.writer","org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter").
          Class myWriter = conf.getClassByName(writerClassName);
          Writer st = myWriter.newInstance()
          st.init();

          • Remove all writer initialization from CollectorStub.java
          • and move all the code to create the pipeline to the init method inside a PipelineWriter class, instead of ServletCollector.java

          That way the writer interface is still simple, the collector class stay very simple and this does not prevent anybody from having a specific writer implementation.
          So at the end you have:

          public class PipelineWriter implements ChukwaWriter
          {

          public void init() throws WriterException
          {
          + if (conf.get("chukwaCollector.pipeline") != null) {
          + String pipeline = conf.get("chukwaCollector.pipeline");
          + try

          { + String[] classes = pipeline.split(","); + ArrayList<PipelineStageWriter> stages = new ArrayList<PipelineStageWriter>(); [...] }

          public void add(List<Chunk> chunks) throws WriterException

          { // call all PipelineStageWriter in sequence }

          }

          Show
          Jerome Boulon added a comment - - edited My point is hat the pipelineWriter should be an implementation of the ChukwaWriter interface and that's really the only thing that the collector should be aware of. So to be able to do what you want: 1) The collector should instantiate one writer implementation based on his configuration 2) The writer should be able to get the collector configuration from somewhere (current design) or should have an init method with a Configuration parameter 3) The contract from the collector point of view is the same call one method on the writer class and the result is success if there's no exception the delta with your implementation is: Remove code from if (conf.get("chukwaCollector.pipeline") != null) .. Replace by something like: writerClassName = conf.get("chukwaCollector.writer","org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter"). Class myWriter = conf.getClassByName(writerClassName); Writer st = myWriter.newInstance() st.init(); Remove all writer initialization from CollectorStub.java and move all the code to create the pipeline to the init method inside a PipelineWriter class, instead of ServletCollector.java That way the writer interface is still simple, the collector class stay very simple and this does not prevent anybody from having a specific writer implementation. So at the end you have: public class PipelineWriter implements ChukwaWriter { public void init() throws WriterException { + if (conf.get("chukwaCollector.pipeline") != null) { + String pipeline = conf.get("chukwaCollector.pipeline"); + try { + String[] classes = pipeline.split(","); + ArrayList<PipelineStageWriter> stages = new ArrayList<PipelineStageWriter>(); [...] } public void add(List<Chunk> chunks) throws WriterException { // call all PipelineStageWriter in sequence } }
          Hide
          Ari Rabkin added a comment -

          Revise to take Jerome's feedback into account. Also add some previously-missing Apache license headers.

          Show
          Ari Rabkin added a comment - Revise to take Jerome's feedback into account. Also add some previously-missing Apache license headers.
          Hide
          Eric Yang added a comment -

          Is pipeline2.patch depending on pipeline.patch? I can't get pipeline2.patch to apply by itself.

          Show
          Eric Yang added a comment - Is pipeline2.patch depending on pipeline.patch? I can't get pipeline2.patch to apply by itself.
          Hide
          Ari Rabkin added a comment -

          No idea what was wrong with previous patch; try this one.

          Show
          Ari Rabkin added a comment - No idea what was wrong with previous patch; try this one.
          Hide
          Ari Rabkin added a comment -

          [issue with previous patch is that a non-patch SVN command [a mode change] got rolled in as well. This has been removed and the latest patch should be good]

          Show
          Ari Rabkin added a comment - [issue with previous patch is that a non-patch SVN command [a mode change] got rolled in as well. This has been removed and the latest patch should be good]
          Hide
          Eric Yang added a comment -

          src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/P
          ipelineableWriter.java doesn't exist in the public SVN. pipeline3.patch does not contain the whole file for PipelineableWriter.java. Please make sure your patch contain PipelineableWriter.java as a whole file. Thanks

          Show
          Eric Yang added a comment - src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/P ipelineableWriter.java doesn't exist in the public SVN. pipeline3.patch does not contain the whole file for PipelineableWriter.java. Please make sure your patch contain PipelineableWriter.java as a whole file. Thanks
          Hide
          Ari Rabkin added a comment -

          Whups, I see what I did. A local rename broke things. Let's try this. Tested on TRUNK, and it seems to work.

          Show
          Ari Rabkin added a comment - Whups, I see what I did. A local rename broke things. Let's try this. Tested on TRUNK, and it seems to work.
          Hide
          Eric Yang added a comment -

          +1 for pipeline writer. pipeline4.patch is the good patch.

          Show
          Eric Yang added a comment - +1 for pipeline writer. pipeline4.patch is the good patch.
          Hide
          Chris Douglas added a comment -

          I committed this. Thanks, Ari

          Show
          Chris Douglas added a comment - I committed this. Thanks, Ari
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #756 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/756/ )
          Hide
          Robert Chansler added a comment -

          Editorial pass over all release notes prior to publication of 0.21.

          Show
          Robert Chansler added a comment - Editorial pass over all release notes prior to publication of 0.21.
          Hide
          Ari Rabkin added a comment -

          It's not obvious to me whether purely Chukwa-related notes should still be in the Hadoop release notes, since we're now fairly decoupled from the Hadoop release schedule.

          Show
          Ari Rabkin added a comment - It's not obvious to me whether purely Chukwa-related notes should still be in the Hadoop release notes, since we're now fairly decoupled from the Hadoop release schedule.

            People

            • Assignee:
              Ari Rabkin
              Reporter:
              Ari Rabkin
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development