Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9346

TFRecordIO inefficient read from sideinput causing pipeline to be slow

Details

    Description

      In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)) is an inefficient way of reading large set of side input.

      Pipeline can be sped up significantly by combinging the PCollection<ResultT> to a single element PCollection<List<ResultT>>.

      Sample code: 

       https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412 from
      ```
      return input
      .getPipeline()
      .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
      ```

      to

      ```
      return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
      ```
      where ToListCombineFn is defined as
      ```
      public static class ToListCombineFn<ResultT> extends CombineFn<ResultT, List<ResultT>, List<ResultT>> {

      @Override
      public List<ResultT> createAccumulator()

      { return new ArrayList<>(); }

      @Override
      public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT input)

      { mutableAccumulator.add(input); return mutableAccumulator; }

      @Override
      public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> accumulators) {
      Iterator<List<ResultT>> iter = accumulators.iterator();
      if (!iter.hasNext())

      { return new ArrayList<>(); }

      List<ResultT> merged = iter.next();
      while (iter.hasNext())

      { merged.addAll(iter.next()); }

      return merged;
      }

      @Override
      public List<ResultT> extractOutput(List<ResultT> accumulator)

      { return accumulator; }

      }
      ```

      Attachments

        Issue Links

          Activity

            People

              piotr-szuberski Piotr Szuberski
              banpiao Ban Piao
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 4h 40m
                  4h 40m

                  Slack

                    Issue deployment