Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.19.0, 2.20.0, 2.21.0, 2.22.0, 2.23.0, 2.24.0, 2.25.0
-
None
-
Beam 2.19.0-2.25.0 with sql extension. JDK 8.
Description
When querying against a nested Row using the SqlTransform, if the object is too deeply nested (depth >4) it will cause the calcite ExpressionWriter to throw IndexOutOfBoundsException. This level of nesting is common with protobufs and similar.
Â
java.lang.IndexOutOfBoundsException: Index: 20, Size: 20 at java.util.ArrayList.rangeCheck(ArrayList.java:659) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:78) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:101) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.UnaryExpression.accept(UnaryExpression.java:49) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:80) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32) at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.AbstractNode.toString(AbstractNode.java:51) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:215) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:118) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:134) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:262) at com.etsy.r2k.unbounded.proc.sql.ProcessorTest.doTest(ProcessorTest.java:61) at com.etsy.r2k.unbounded.proc.sql.ProcessorTest.testRecursionError(ProcessorTest.java:37)
Test Code:
import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class ProcessorTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); @Before public void setup() { pipeline.enableAutoRunIfMissing(false).enableAbandonedNodeEnforcement(false); } @Test public void testRecursion() { doTest(4); } @Test public void testRecursionError() { // TODO file bug doTest(5); } private void doTest(final int depth) { Schema schema = Schema.builder().addStringField("foo").build(); Row row = Row.withSchema(schema).addValue("your boat").build(); for (int i = 0; i < depth; i++) { schema = Schema.builder().addRowField(String.format("foo%d", i), schema).build(); row = Row.withSchema(schema).addValue(row).build(); } final TestStream<Row> stream = TestStream.create(schema).addElements(row).advanceWatermarkToInfinity(); final PCollection<Row> input = pipeline.apply(stream); final PCollectionTuple tuple = PCollectionTuple.of("a", input); final StringBuilder selector = new StringBuilder(); for (int i = depth - 1; i >= 0; i--) { selector.append(".`foo").append(i).append('`'); } final SqlTransform transform = SqlTransform.query("SELECT a" + selector + ".foo FROM a"); final PCollection<String> results = tuple .apply("query", transform) .apply( MapElements.into(TypeDescriptors.strings()) .via((SerializableFunction<Row, String>) r -> r.getString("foo"))); PAssert.that(results).containsInAnyOrder("your boat"); pipeline.run().waitUntilFinish(); } }
Attachments
Issue Links
- is blocked by
-
BEAM-9379 Upgrade to Calcite 1.26.0
- Triage Needed
- is caused by
-
CALCITE-3237 ExpressionWriter not updating indent correctly
- Closed