Uploaded image for project: 'Crunch (Retired)'
  1. Crunch (Retired)
  2. CRUNCH-232

DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • MapReduce Patterns
    • None

    Description

      DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.

      Sample Test:

       
              final Configuration config = HBaseTest.getConf();
              final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
              final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
                      "/HbaseTestFile.txt").toString());
       
              final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
                      "/HbaseTestFile2.txt").toString());
       
              final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
                      Avros.tableOf(Avros.ints(), Avros.strings()));
       
              final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
                      Avros.tableOf(Avros.ints(), Avros.strings()));
       
              final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
       
              final PCollection<String> joinedStrings = joinedTable.parallelDo(
                      new MapFn<Pair<Integer, Pair<String, String>>, String>() {
                          private static final long serialVersionUID = -8796426750247480646L;
       
                          @Override
                          public String map(final Pair<Integer, Pair<String, String>> input) {
                              return input.second().first() + "/" + input.second().second();
                          }
                      }, Avros.strings());
       
              System.out.println(joinedStrings.materialize().iterator().hasNext());
       

      The two DoFnCheck looks something like this:

       
      public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
          /**
           * 
           */
          private static final long serialVersionUID = 6780749658216132026L;
       
          @Override
          public void initialize() {
              System.out
                      .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
          }
       
          @Override
          public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
              System.out
                      .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
          }
       
          @Override
          public void process(final String input, final Emitter<Pair<Integer, String>> emitter) {
              // TODO Auto-generated method stub
              System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
       
              final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
       
              emitter.emit(pair);
          }
      }
      

      The console looks like this:

      !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm initializing) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (Process) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm cleaned up) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm initializing) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

      Attachments

        1. CRUNCH-232.patch
          11 kB
          Josh Wills

        Activity

          People

            Unassigned Unassigned
            anujojha Anuj Ojha
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: