Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      As you already know, Apache Beam provides unified programming model for both batch and streaming inputs.

      The APIs are generally associated with data filtering and transforming. So we'll need to implement some data processing runner like https://github.com/dapurv5/MapReduce-BSP-Adapter/blob/master/src/main/java/org/apache/hama/mapreduce/examples/WordCount.java

      Also, implementing similarity join can be funny. According to http://www.ruizhang.info/publications/TPDS2015-Heads_Join.pdf, Apache Hama is clearly winner among Apache Hadoop and Apache Spark.

      Since it consists of transformation, aggregation, and partition computations, I think it's possible to implement using Apache Beam APIs.

        Issue Links

          Activity

          Hide
          seedengine JongYoon Lim added a comment -

          Do you mean that each superstep can be executed in data pipeline as a pcollection? Could you add more details if I didn't understand correcly..?

          Show
          seedengine JongYoon Lim added a comment - Do you mean that each superstep can be executed in data pipeline as a pcollection? Could you add more details if I didn't understand correcly..?
          Hide
          udanax Edward J. Yoon added a comment -

          Hi, I didn't look at dataflow (apache beam) closely, but:

          >> Do you mean that each superstep can be executed in data pipeline as a pcollection?

          I guess yes, or single job can be executed as the case may be.

          If you're interested in working on this, you can refer https://github.com/dataArtisans/flink-dataflow/blob/master/runner/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java

          And, before we do this, HAMA-940 and data processing BSP maybe the first I guess. Please feel free to drop your opinion and contribute the patches.

          If you have any questions, let me know.

          Show
          udanax Edward J. Yoon added a comment - Hi, I didn't look at dataflow (apache beam) closely, but: >> Do you mean that each superstep can be executed in data pipeline as a pcollection? I guess yes, or single job can be executed as the case may be. If you're interested in working on this, you can refer https://github.com/dataArtisans/flink-dataflow/blob/master/runner/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java And, before we do this, HAMA-940 and data processing BSP maybe the first I guess. Please feel free to drop your opinion and contribute the patches. If you have any questions, let me know.
          Hide
          seedengine JongYoon Lim added a comment -

          Thank you for the information. I'm interested in this feature. I'll start analyzing flink runner.

          Show
          seedengine JongYoon Lim added a comment - Thank you for the information. I'm interested in this feature. I'll start analyzing flink runner.
          Hide
          udanax Edward J. Yoon added a comment -

          Just FYI, Apache Beam's basic example is wordcount. I guess, the batch mode can be similar with org.apache.hama.examples.PiEstimator: (n - 1) tasks parses and counts the words and 1 task aggregates the word counts and emits the final result. The streaming mode is not sure, so you'll need to check how it handles io.

          Show
          udanax Edward J. Yoon added a comment - Just FYI, Apache Beam's basic example is wordcount. I guess, the batch mode can be similar with org.apache.hama.examples.PiEstimator: (n - 1) tasks parses and counts the words and 1 task aggregates the word counts and emits the final result. The streaming mode is not sure, so you'll need to check how it handles io.
          Hide
          seedengine JongYoon Lim added a comment -

          Yes, I'll check the streaming mode as well.

          Show
          seedengine JongYoon Lim added a comment - Yes, I'll check the streaming mode as well.
          Hide
          seedengine JongYoon Lim added a comment -

          FlinkPipelineRunner internally has a translator for both pipeline and transform. It seems that translator translates Beam operators to their counterparts of flink and saves regarding information in TranslationContext which is used for flink job processing. I think this patch can be started from implementing a simple translator for batch job first.

          Show
          seedengine JongYoon Lim added a comment - FlinkPipelineRunner internally has a translator for both pipeline and transform. It seems that translator translates Beam operators to their counterparts of flink and saves regarding information in TranslationContext which is used for flink job processing. I think this patch can be started from implementing a simple translator for batch job first.
          Hide
          seedengine JongYoon Lim added a comment - - edited

          Hi, it took some time to understand Beam API, spark and flink runner for Beam. And it seems that Beam's transforms can be translated to Hama's API as follow. And BSP for dataflow could be similar to SuperstepBSP. (if I have misunderstandings, please correct me)
          BEAM -> HAMA
          ParDo -> Superstep
          Read.Bound -> RecordReader
          Writt.Bound -> RecordWriter
          Combine -> Combiner
          GroupByKey -> ?

          I'm about to start from batch mode first until Hama's streaming is ready. And I'll add sub-tasks for this soon.

          Show
          seedengine JongYoon Lim added a comment - - edited Hi, it took some time to understand Beam API, spark and flink runner for Beam. And it seems that Beam's transforms can be translated to Hama's API as follow. And BSP for dataflow could be similar to SuperstepBSP. (if I have misunderstandings, please correct me) BEAM -> HAMA ParDo -> Superstep Read.Bound -> RecordReader Writt.Bound -> RecordWriter Combine -> Combiner GroupByKey -> ? I'm about to start from batch mode first until Hama's streaming is ready. And I'll add sub-tasks for this soon.
          Hide
          udanax Edward J. Yoon added a comment -

          https://cloud.google.com/dataflow/examples/wordcount-example

          This page is well-described about beam concept. The flow is like below:

              Creating the Pipeline
              Applying transforms to the Pipeline
                  Reading input (in this example: reading text files)
                  Applying ParDo transforms
                  Applying SDK-provided transforms (in this example: Count)
                  Writing output (in this example: writing to Google Cloud Storage)
              Running the Pipeline
          

          Once we created Hama pipeline we should able to run the program like below:

            public static void main(String[] args) {
              // Create a pipeline parameterized by commandline flags.
              Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(arg));
          
              p.apply(TextIO.Read.from("gs://..."))   // Read input.
               .apply(new CountWords())               // Do some processing.
               .apply(TextIO.Write.to("gs://..."));   // Write output.
          
              // Run the pipeline.
              p.run();
            }
          

          For I/O operations, you can refer this https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java (instead of org.apache.hadoop.mapreduce.lib.input.FileInputFormat you should use https://github.com/apache/hama/blob/master/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java)

          BSP for dataflow could be similar to SuperstepBSP

          I think so. GroupByKey seems a built-in processor that groups records by key. We should implement it using a superstep.

          Show
          udanax Edward J. Yoon added a comment - https://cloud.google.com/dataflow/examples/wordcount-example This page is well-described about beam concept. The flow is like below: Creating the Pipeline Applying transforms to the Pipeline Reading input (in this example: reading text files) Applying ParDo transforms Applying SDK-provided transforms (in this example: Count) Writing output (in this example: writing to Google Cloud Storage) Running the Pipeline Once we created Hama pipeline we should able to run the program like below: public static void main( String [] args) { // Create a pipeline parameterized by commandline flags. Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(arg)); p.apply(TextIO.Read.from( "gs: //..." )) // Read input. .apply( new CountWords()) // Do some processing. .apply(TextIO.Write.to( "gs: //..." )); // Write output. // Run the pipeline. p.run(); } For I/O operations, you can refer this https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java (instead of org.apache.hadoop.mapreduce.lib.input.FileInputFormat you should use https://github.com/apache/hama/blob/master/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java ) BSP for dataflow could be similar to SuperstepBSP I think so. GroupByKey seems a built-in processor that groups records by key. We should implement it using a superstep.
          Hide
          seedengine JongYoon Lim added a comment -

          Thank you for your feedbace.. And do you think it's better to branch from hama for this or have an independent repo(github)?

          Show
          seedengine JongYoon Lim added a comment - Thank you for your feedbace.. And do you think it's better to branch from hama for this or have an independent repo(github)?
          Hide
          udanax Edward J. Yoon added a comment -

          Why don't we contribute this feature to the Apache Beam directly? https://github.com/apache/incubator-beam/tree/master/runners

          Show
          udanax Edward J. Yoon added a comment - Why don't we contribute this feature to the Apache Beam directly? https://github.com/apache/incubator-beam/tree/master/runners
          Hide
          seedengine JongYoon Lim added a comment -

          Yes, I can add a PR link to https://issues.apache.org/jira/browse/BEAM-612 once PoC is done.

          Show
          seedengine JongYoon Lim added a comment - Yes, I can add a PR link to https://issues.apache.org/jira/browse/BEAM-612 once PoC is done.
          Hide
          udanax Edward J. Yoon added a comment -

          >> once PoC is done

          Great. If you need some helps, feel free to let me know

          Show
          udanax Edward J. Yoon added a comment - >> once PoC is done Great. If you need some helps, feel free to let me know
          Hide
          seedengine JongYoon Lim added a comment -

          Hi Edward, could you give me some idea what the recommend way is to create the same instance at groom server with an instance which is created from a translator(beam) at master..?

          Show
          seedengine JongYoon Lim added a comment - Hi Edward, could you give me some idea what the recommend way is to create the same instance at groom server with an instance which is created from a translator(beam) at master..?
          Hide
          udanax Edward J. Yoon added a comment -

          I don't understand exactly, can you please share your progress?

          Show
          udanax Edward J. Yoon added a comment - I don't understand exactly, can you please share your progress?
          Hide
          seedengine JongYoon Lim added a comment -

          Hi, Edward. First of all, sorry for long delay.

          This is process for testing beam-hama-runner.
          1. Define testing ParDo, for example, as below.

              PCollection<KV<Text, LongWritable>> output = input.apply("test", ParDo.of(new DoFn<KV<Text, LongWritable>, KV<Text, LongWritable>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  for (String word : c.element().toString().split("[^a-zA-Z']+")) {
                    if (!word.isEmpty()) {
                      c.output(KV.of(new Text(word), new LongWritable(11)));
                    }
                  }
                }
              }));
          

          2. For translation of ParDo, I can pass the ParDo to DoFnFunction which is a subclass of Superstep and has OldDoFn.ProcessContext. Here, I'd like to create dofn instance in hama cluster after finishing all translation. And I'm not sure how I can do it easily...

            private static <InputT, OutputT> TransformTranslator<ParDo.Bound<InputT, OutputT>> parDo() {
              return new TransformTranslator<ParDo.Bound<InputT, OutputT>>() {
                @Override
                public void translate(final ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
          //        context.addSuperstep(TestSuperStep.class);
                  DoFnFunction dofn = new DoFnFunction((OldDoFn<KV, KV>) transform.getFn());
          //        context.addSuperstep(dofn.getClass());
                }
              };
            }
          
          Show
          seedengine JongYoon Lim added a comment - Hi, Edward. First of all, sorry for long delay. This is process for testing beam-hama-runner. 1. Define testing ParDo, for example, as below. PCollection<KV<Text, LongWritable>> output = input.apply( "test" , ParDo.of( new DoFn<KV<Text, LongWritable>, KV<Text, LongWritable>>() { @ProcessElement public void processElement(ProcessContext c) { for ( String word : c.element().toString().split( "[^a-zA-Z']+" )) { if (!word.isEmpty()) { c.output(KV.of( new Text(word), new LongWritable(11))); } } } })); 2. For translation of ParDo, I can pass the ParDo to DoFnFunction which is a subclass of Superstep and has OldDoFn.ProcessContext. Here, I'd like to create dofn instance in hama cluster after finishing all translation. And I'm not sure how I can do it easily... private static <InputT, OutputT> TransformTranslator<ParDo.Bound<InputT, OutputT>> parDo() { return new TransformTranslator<ParDo.Bound<InputT, OutputT>>() { @Override public void translate( final ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { // context.addSuperstep(TestSuperStep.class); DoFnFunction dofn = new DoFnFunction((OldDoFn<KV, KV>) transform.getFn()); // context.addSuperstep(dofn.getClass()); } }; }
          Hide
          udanax Edward J. Yoon added a comment -

          Here's my skeleton code with example that counts the words. You should implement the HamaPipelineRunner. Just translate and execute batch job. I think you can find how to translate them from flink's code: https://github.com/dataArtisans/flink-dataflow/blob/aad5d936abd41240f3e15d294ea181fb9cca05e0/runner/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java#L410

          public class WordCountTest {
          
            static final String[] WORDS_ARRAY = new String[] { "hi there", "hi",
                "hi sue bob", "hi sue", "", "bob hi" };
          
            static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
          
            static final String[] COUNTS_ARRAY = new String[] { "hi: 5", "there: 1",
                "sue: 2", "bob: 2" };
          
            /**
             * Example test that tests a PTransform by using an in-memory input and
             * inspecting the output.
             */
            @Test
            @Category(RunnableOnService.class)
            public void testCountWords() throws Exception {
              HamaOptions options = PipelineOptionsFactory.as(HamaOptions.class);
              options.setRunner(HamaPipelineRunner.class);
              Pipeline p = Pipeline.create(options);
          
              PCollection<String> input = p.apply(Create.of(WORDS).withCoder(
                  StringUtf8Coder.of()));
          
              PCollection<String> output = input
                  .apply(new WordCount())
                  .apply(MapElements.via(new FormatAsTextFn()));
                  //.apply(TextIO.Write.to("/tmp/result"));
          
              PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
              p.run().waitUntilFinish();
            }
          
            public static class WordCount extends
                PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
              
              private static final long serialVersionUID = 1L;
          
              @Override
              public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
          
                // Convert lines of text into individual words.
                PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
                  private static final long serialVersionUID = 1L;
                  private final Aggregator<Long, Long> emptyLines =
                      createAggregator("emptyLines", new Sum.SumLongFn());
          
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    if (c.element().trim().isEmpty()) {
                      emptyLines.addValue(1L);
                    }
          
                    // Split the line into words.
                    String[] words = c.element().split("[^a-zA-Z']+");
          
                    // Output each word encountered into the output PCollection.
                    for (String word : words) {
                      if (!word.isEmpty()) {
                        c.output(word);
                      }
                    }
                  }
                }));
          
                // Count the number of times each word occurs.
                PCollection<KV<String, Long>> wordCounts = words.apply(Count
                    .<String> perElement());
          
                return wordCounts;
              }
            }
          
            // ///// TODO
            public static class HamaPipelineRunner extends
                PipelineRunner<HamaPipelineResult> {
          
              public static HamaPipelineRunner fromOptions(PipelineOptions x) {
                return new HamaPipelineRunner();
              }
          
              @Override
              public <Output extends POutput, Input extends PInput> Output apply(
                              PTransform<Input, Output> transform, Input input) {
                      return super.apply(transform, input);
              }
              
              @Override
              public HamaPipelineResult run(Pipeline pipeline) {
                // TODO Auto-generated method stub
                System.out.println("Executing pipeline using HamaPipelineRunner.");
          
                // TODO you need to translate pipeline to Hama program
                // and execute pipeline
                // return the result
                return null;
              }
          
            }
          
            public class HamaPipelineResult implements PipelineResult {
          
              @Override
              public State getState() {
                // TODO Auto-generated method stub
                return null;
              }
          
              @Override
              public State cancel() throws IOException {
                // TODO Auto-generated method stub
                return null;
              }
          
              @Override
              public State waitUntilFinish(Duration duration) {
                // TODO Auto-generated method stub
                return null;
              }
          
              @Override
              public State waitUntilFinish() {
                // TODO Auto-generated method stub
                return null;
              }
          
              @Override
              public <T> AggregatorValues<T> getAggregatorValues(
                  Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
                // TODO Auto-generated method stub
                return null;
              }
          
              @Override
              public MetricResults metrics() {
                // TODO Auto-generated method stub
                return null;
              }
          
            }
          
            public static interface HamaOptions extends PipelineOptions {
          
            }
          
          }
          
          Show
          udanax Edward J. Yoon added a comment - Here's my skeleton code with example that counts the words. You should implement the HamaPipelineRunner. Just translate and execute batch job. I think you can find how to translate them from flink's code: https://github.com/dataArtisans/flink-dataflow/blob/aad5d936abd41240f3e15d294ea181fb9cca05e0/runner/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java#L410 public class WordCountTest { static final String [] WORDS_ARRAY = new String [] { "hi there" , "hi" , "hi sue bob" , "hi sue" , "", " bob hi" }; static final List< String > WORDS = Arrays.asList(WORDS_ARRAY); static final String [] COUNTS_ARRAY = new String [] { "hi: 5" , "there: 1" , "sue: 2" , "bob: 2" }; /** * Example test that tests a PTransform by using an in-memory input and * inspecting the output. */ @Test @Category(RunnableOnService.class) public void testCountWords() throws Exception { HamaOptions options = PipelineOptionsFactory.as(HamaOptions.class); options.setRunner(HamaPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection< String > input = p.apply(Create.of(WORDS).withCoder( StringUtf8Coder.of())); PCollection< String > output = input .apply( new WordCount()) .apply(MapElements.via( new FormatAsTextFn())); //.apply(TextIO.Write.to( "/tmp/result" )); PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); p.run().waitUntilFinish(); } public static class WordCount extends PTransform<PCollection< String >, PCollection<KV< String , Long >>> { private static final long serialVersionUID = 1L; @Override public PCollection<KV< String , Long >> apply(PCollection< String > lines) { // Convert lines of text into individual words. PCollection< String > words = lines.apply(ParDo.of( new DoFn< String , String >() { private static final long serialVersionUID = 1L; private final Aggregator< Long , Long > emptyLines = createAggregator( "emptyLines" , new Sum.SumLongFn()); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); } // Split the line into words. String [] words = c.element().split( "[^a-zA-Z']+" ); // Output each word encountered into the output PCollection. for ( String word : words) { if (!word.isEmpty()) { c.output(word); } } } })); // Count the number of times each word occurs. PCollection<KV< String , Long >> wordCounts = words.apply(Count .< String > perElement()); return wordCounts; } } // ///// TODO public static class HamaPipelineRunner extends PipelineRunner<HamaPipelineResult> { public static HamaPipelineRunner fromOptions(PipelineOptions x) { return new HamaPipelineRunner(); } @Override public <Output extends POutput, Input extends PInput> Output apply( PTransform<Input, Output> transform, Input input) { return super .apply(transform, input); } @Override public HamaPipelineResult run(Pipeline pipeline) { // TODO Auto-generated method stub System .out.println( "Executing pipeline using HamaPipelineRunner." ); // TODO you need to translate pipeline to Hama program // and execute pipeline // return the result return null ; } } public class HamaPipelineResult implements PipelineResult { @Override public State getState() { // TODO Auto-generated method stub return null ; } @Override public State cancel() throws IOException { // TODO Auto-generated method stub return null ; } @Override public State waitUntilFinish(Duration duration) { // TODO Auto-generated method stub return null ; } @Override public State waitUntilFinish() { // TODO Auto-generated method stub return null ; } @Override public <T> AggregatorValues<T> getAggregatorValues( Aggregator<?, T> aggregator) throws AggregatorRetrievalException { // TODO Auto-generated method stub return null ; } @Override public MetricResults metrics() { // TODO Auto-generated method stub return null ; } } public static interface HamaOptions extends PipelineOptions { } }
          Hide
          seedengine JongYoon Lim added a comment -

          I added a link for skeleton code for hama-runner.
          Actually, I added TranslationContext class for executing batchjob. I mean results(supersteps) from translator are added to list in TranslationContext and after every translation, it executes each supersteps one by one. But when I add result(superstep), it's an object not class. So, I've just wondered if there is an easy way to create same object in grooms because those results(objects) are created on master. Also I wonder if this approach is correct or not..

          Show
          seedengine JongYoon Lim added a comment - I added a link for skeleton code for hama-runner. Actually, I added TranslationContext class for executing batchjob. I mean results(supersteps) from translator are added to list in TranslationContext and after every translation, it executes each supersteps one by one. But when I add result(superstep), it's an object not class. So, I've just wondered if there is an easy way to create same object in grooms because those results(objects) are created on master. Also I wonder if this approach is correct or not..
          Hide
          udanax Edward J. Yoon added a comment -

          cool, let me check.

          Show
          udanax Edward J. Yoon added a comment - cool, let me check.
          Hide
          seedengine JongYoon Lim added a comment -

          Hi Edward, could you give me some feedback..?

          Show
          seedengine JongYoon Lim added a comment - Hi Edward, could you give me some feedback..?
          Hide
          seedengine JongYoon Lim added a comment -

          Hi Edward, could you create a branch called 'beam_support' on github? These days, I'm working on this issue again and it looks working. Now I'm working on this on my local beam's branch but I think it'd be better to work on hama's one before it can support recent beam version. (I'm working this based on beam's release-0.3.0-incubating, but they already have released 0.6.0 version. ) I think working on hama is more easier to get review and feedback from other developers. After that, it could be contributed to the beam project.

          Show
          seedengine JongYoon Lim added a comment - Hi Edward, could you create a branch called 'beam_support' on github? These days, I'm working on this issue again and it looks working. Now I'm working on this on my local beam's branch but I think it'd be better to work on hama's one before it can support recent beam version. (I'm working this based on beam's release-0.3.0-incubating, but they already have released 0.6.0 version. ) I think working on hama is more easier to get review and feedback from other developers. After that, it could be contributed to the beam project.
          Hide
          udanax Edward J. Yoon added a comment -

          Sorry for late reply.

          could you create a branch called 'beam_support' on github?

          Sure. or, you'll also able to create a branch because you're committer. I can do it this weekend.

          Show
          udanax Edward J. Yoon added a comment - Sorry for late reply. could you create a branch called 'beam_support' on github? Sure. or, you'll also able to create a branch because you're committer. I can do it this weekend.
          Hide
          seedengine JongYoon Lim added a comment -

          So, I'll try to create the branch.
          Thank you

          Show
          seedengine JongYoon Lim added a comment - So, I'll try to create the branch. Thank you
          Hide
          udanax Edward J. Yoon added a comment -
          # create a new branch inside your directory 'current'
          git checkout -b HAMA-983
          # ... do some changes to the files ...
          # store changes in the branch
          git push origin HAMA-983
          # commit changes to the branch
          git commit -a -m '[HAMA-983] Hama runner for DataFlow'
          Then go to your GitHub HAMA page and do a Pull Request. 
          

          Hi JongYoon, you can create new branch like above.

          Show
          udanax Edward J. Yoon added a comment - # create a new branch inside your directory 'current' git checkout -b HAMA-983 # ... do some changes to the files ... # store changes in the branch git push origin HAMA-983 # commit changes to the branch git commit -a -m '[HAMA-983] Hama runner for DataFlow' Then go to your GitHub HAMA page and do a Pull Request. Hi JongYoon, you can create new branch like above.

            People

            • Assignee:
              Unassigned
              Reporter:
              udanax Edward J. Yoon
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development