Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3210

The problem about the use of waitUntilFinish() in DirectRunner

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Not A Problem
    • 2.1.0
    • Not applicable
    • runner-direct
    • None
    • Ubuntn 14.04.3 LTS
      JDK 1.8
      Beam 2.1.0
      Maven 3.5.0

    Description

      Dear sir,

      The description of waitUntilFinish() is "waits until the pipeline finishes and returns the final status."

      In my project, a static variable is used to record a PCollection context, where the static variable is a data list type.

      For this, I considered the "p.run().waitUntilFinish()" to wait until the pipeline finishes to avoid the loss of record in the data list.

      Unfortunately, there is a problem that the data list sometimes may record the "null" value instead of the realistic value

      In order to clearly explain, i provide my java code in the following.
      "import java.io.IOException;
      import java.util.ArrayList;
      import org.apache.beam.sdk.Pipeline;
      import org.apache.beam.sdk.options.PipelineOptions;
      import org.apache.beam.sdk.options.PipelineOptionsFactory;
      import org.apache.beam.sdk.transforms.Create;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.Mean;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
      import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
      public class BeamTestStatic extends Thread {
        public static ArrayList<Double> myList = new ArrayList<Double>();

        public static class StaticTest extends DoFn<Double, Void> {
          @ProcessElement
          public void test(ProcessContext c) {   myList.add(c.element());     }
        }
       public static void main(String[] args) throws IOException {
      StaticTest testa=new StaticTest();
      PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline p = Pipeline.create(options);
        PCollection<Double> data=p.apply("Rawdata",
      Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
      PCollection<Void> listtest= data.apply(ParDo.of(testa));
        p.run().waitUntilFinish();

        System.out.println("mylist_size_a="+myList.size());

      for (int i = 0; i < myList.size(); i++) { System.out.println("mylist_data="+myList.get(i)); }
      "

      In addition, the result of my code is:
      "mylist_size_a=10
      mylist_data=null
      mylist_data=4.0
      mylist_data=5.0
      mylist_data=9.0
      mylist_data=6.0
      mylist_data=1.0
      mylist_data=7.0
      mylist_data=8.0
      mylist_data=10.0
      mylist_data=3.0"

      If you have any further information, I am glad to be informed.

      Thanks

      Rick

      Attachments

        Activity

          People

            lcwik Luke Cwik
            Ricklin Rick Lin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: