Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
0.6.0
-
None
Description
DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables.
Sample Test:
final Configuration config = HBaseTest.getConf(); final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile.txt").toString()); final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile2.txt").toString()); final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2); final PCollection<String> joinedStrings = joinedTable.parallelDo( new MapFn<Pair<Integer, Pair<String, String>>, String>() { private static final long serialVersionUID = -8796426750247480646L; @Override public String map(final Pair<Integer, Pair<String, String>> input) { return input.second().first() + "/" + input.second().second(); } }, Avros.strings()); System.out.println(joinedStrings.materialize().iterator().hasNext());
The two DoFnCheck looks something like this:
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> { /** * */ private static final long serialVersionUID = 6780749658216132026L; @Override public void initialize() { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void cleanup(final Emitter<Pair<Integer, String>> emitter) { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void process(final String input, final Emitter<Pair<Integer, String>> emitter) { // TODO Auto-generated method stub System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); final Pair<Integer, String> pair = new Pair<Integer, String>(1, input); emitter.emit(pair); } }
The console looks like this:
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm initializing) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (Process) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm cleaned up) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Unable to render embedded object: File (I'm initializing) not found.!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!