Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11140

IndexOutOfBoundsException with nested Row in SqlTransform

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.19.0, 2.20.0, 2.21.0, 2.22.0, 2.23.0, 2.24.0, 2.25.0
    • None
    • dsl-sql
    • Beam 2.19.0-2.25.0 with sql extension. JDK 8.

    Description

      When querying against a nested Row using the SqlTransform, if the object is too deeply nested (depth >4) it will cause the calcite ExpressionWriter to throw IndexOutOfBoundsException. This level of nesting is common with protobufs and similar.

       

       java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
              at java.util.ArrayList.rangeCheck(ArrayList.java:659)
              at java.util.ArrayList.get(ArrayList.java:435)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:78)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:101)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.UnaryExpression.accept(UnaryExpression.java:49)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:80)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
              at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.AbstractNode.toString(AbstractNode.java:51)
              at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:215)
              at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:118)
              at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
              at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
              at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
              at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
              at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:134)
              at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
              at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
              at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
              at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:262)
              at com.etsy.r2k.unbounded.proc.sql.ProcessorTest.doTest(ProcessorTest.java:61)
              at com.etsy.r2k.unbounded.proc.sql.ProcessorTest.testRecursionError(ProcessorTest.java:37)
       

      Test Code:

      import org.apache.beam.sdk.extensions.sql.SqlTransform;
      import org.apache.beam.sdk.schemas.Schema;
      import org.apache.beam.sdk.testing.PAssert;
      import org.apache.beam.sdk.testing.TestPipeline;
      import org.apache.beam.sdk.testing.TestStream;
      import org.apache.beam.sdk.transforms.MapElements;
      import org.apache.beam.sdk.transforms.SerializableFunction;
      import org.apache.beam.sdk.values.PCollection;
      import org.apache.beam.sdk.values.PCollectionTuple;
      import org.apache.beam.sdk.values.Row;
      import org.apache.beam.sdk.values.TypeDescriptors;
      import org.junit.Before;
      import org.junit.Rule;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.JUnit4;
      
      @RunWith(JUnit4.class)
      public class ProcessorTest {
        @Rule public final TestPipeline pipeline = TestPipeline.create();
      
        @Before
        public void setup() {
          pipeline.enableAutoRunIfMissing(false).enableAbandonedNodeEnforcement(false);
        }
      
        @Test
        public void testRecursion() {
          doTest(4);
        }
      
        @Test
        public void testRecursionError() {
          // TODO file bug
          doTest(5);
        }
      
        private void doTest(final int depth) {
          Schema schema = Schema.builder().addStringField("foo").build();
      
          Row row = Row.withSchema(schema).addValue("your boat").build();
          for (int i = 0; i < depth; i++) {
            schema = Schema.builder().addRowField(String.format("foo%d", i), schema).build();
            row = Row.withSchema(schema).addValue(row).build();
          }
      
          final TestStream<Row> stream =
              TestStream.create(schema).addElements(row).advanceWatermarkToInfinity();
      
          final PCollection<Row> input = pipeline.apply(stream);
          final PCollectionTuple tuple = PCollectionTuple.of("a", input);
          final StringBuilder selector = new StringBuilder();
          for (int i = depth - 1; i >= 0; i--) {
            selector.append(".`foo").append(i).append('`');
          }
          final SqlTransform transform = SqlTransform.query("SELECT a" + selector + ".foo FROM a");
          final PCollection<String> results =
              tuple
                  .apply("query", transform)
                  .apply(
                      MapElements.into(TypeDescriptors.strings())
                          .via((SerializableFunction<Row, String>) r -> r.getString("foo")));
      
          PAssert.that(results).containsInAnyOrder("your boat");
          pipeline.run().waitUntilFinish();
        }
      }
      
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              linuxdaemon Aspen Barnes
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: