Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14953

Parquet table source should use schema type to build FilterPredicate

    XMLWordPrintableJSON

Details

    Description

      The issue happens when the data type of value in predicate inferred from SQL doesn't match the parquet schema. For example, foo is a long type, foo < 1 is the predicate. Literal will be recognized as an integration. It causes the parquet FilterPredicate is mistakenly created for the column of Integer type. Then, the exception comes.

      java.lang.UnsupportedOperationException
      at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)
      at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)
      at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
      at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
      at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
      at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
      at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
      at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
      at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:231)
      at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155)
      at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
      at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131)
      at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115)
      at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38)
      at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52)
      at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
      at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
      at org.apache.flink.formats.parquet.ParquetTableSourceITCase.testScanWithProjectionAndFilter(ParquetTableSourceITCase.java:91)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
      at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
      at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

      Attachments

        Issue Links

          Activity

            People

              ZhenqiuHuang Zhenqiu Huang
              ZhenqiuHuang Zhenqiu Huang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m