Details
-
Improvement
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
Description
In TFRecordIO, Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)) is an inefficient way of reading large set of side input.
Pipeline can be sped up significantly by combinging the PCollection<ResultT> to a single element PCollection<List<ResultT>>.
Sample code:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412 from
```
return input
.getPipeline()
.apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
```
to
```
return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
```
where ToListCombineFn is defined as
```
public static class ToListCombineFn<ResultT> extends CombineFn<ResultT, List<ResultT>, List<ResultT>> {
@Override
public List<ResultT> createAccumulator()
@Override
public List<ResultT> addInput(List<ResultT> mutableAccumulator, ResultT input)
@Override
public List<ResultT> mergeAccumulators(Iterable<List<ResultT>> accumulators) {
Iterator<List<ResultT>> iter = accumulators.iterator();
if (!iter.hasNext())
List<ResultT> merged = iter.next();
while (iter.hasNext())
return merged;
}
@Override
public List<ResultT> extractOutput(List<ResultT> accumulator)
}
```
Attachments
Issue Links
- causes
-
BEAM-9563 TFRecordIO inefficient read from sideinput causing pipeline to be slow - fix
- Open
- links to