Details
-
Bug
-
Status: Resolved
-
P1
-
Resolution: Not A Bug
-
2.37.0
-
None
Description
If pipeline, written in Java SDK, contains a large number of PTransforms then it fails with a java.lang.StackOverflowError
Code snippet to reproduce (based on WordCount example):
public class WordCountWithNFilters { private static final int N = 100; public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); Pipeline p = Pipeline.create(options); PCollection<String> words = p.apply(TextIO.read().from("file://tmp/input.txt")) .apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))); for (int i = 0; i < N; i++) { words = words.apply(Filter.by((String word) -> !word.isEmpty())); } words.apply(Count.perElement()) .apply( MapElements.into(TypeDescriptors.strings()) .via( (KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to("wordcounts")); p.run().waitUntilFinish(); } }
Log while running with SparkRunner:
2022-03-14 19:01:30,465 [pool-3-thread-1] INFO org.apache.beam.runners.spark.SparkRunner$Evaluator - Evaluating View.CreatePCollectionView [WARNING] org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.StackOverflowError at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom (SparkPipelineResult.java:73) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish (SparkPipelineResult.java:104) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish (SparkPipelineResult.java:92) at org.apache.beam.samples.sql.WordCountWithNFilters.main (WordCountWithNFilters.java:39) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:748) Caused by: java.lang.StackOverflowError at java.lang.ReflectiveOperationException.<init> (ReflectiveOperationException.java:89) at java.lang.reflect.InvocationTargetException.<init> (InvocationTargetException.java:72) at sun.reflect.GeneratedMethodAccessor39.invoke (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace (ObjectStreamClass.java:1244) at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) at scala.collection.immutable.List$SerializationProxy.writeObject (List.scala:479) at sun.reflect.GeneratedMethodAccessor40.invoke (Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) ...
It seems that N depends on environment configuration.