Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2167

Register HCatalog as external catalog in TableEnvironment

    Details

    • Type: New Feature
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.9
    • Fix Version/s: None
    • Component/s: Table API & SQL
    • Labels:

      Description

      Register Hive's HCatalog as external catalog in the TableEnvironment. After registration, Table API or SQL queries should be able to access tables registered in HCatalog.

        Issue Links

          Activity

          Hide
          twalthr Timo Walther added a comment -

          Will work on this to make myself familiar with the Table API code.

          Show
          twalthr Timo Walther added a comment - Will work on this to make myself familiar with the Table API code.
          Hide
          twalthr Timo Walther added a comment -

          I'm unable to connect my HCatInputFormat to the HCatalog metastore.

          Configuration config = new Configuration();
          config.addResource("/home/twalthr/flink/HCAT/apache-hive-1.2.0-bin/conf/hive-site.xml");
          env.createInput(new HCatInputFormat<HCatRecord>("default", "mytable", config)).print();
          

          Is there anything I have to consider? Any hints are highly appreciated!

          Show
          twalthr Timo Walther added a comment - I'm unable to connect my HCatInputFormat to the HCatalog metastore. Configuration config = new Configuration(); config.addResource( "/home/twalthr/flink/HCAT/apache-hive-1.2.0-bin/conf/hive-site.xml" ); env.createInput( new HCatInputFormat<HCatRecord>( " default " , "mytable" , config)).print(); Is there anything I have to consider? Any hints are highly appreciated!
          Hide
          rmetzger Robert Metzger added a comment -

          Mh. Are there any error message?
          Maybe you have to load the hive-site.xml through the classloader?

          Show
          rmetzger Robert Metzger added a comment - Mh. Are there any error message? Maybe you have to load the hive-site.xml through the classloader?
          Hide
          twalthr Timo Walther added a comment -

          Thanks Robert, I solved the issue. The hive-site.xml has to be in the classpath. Everything else does not work.

          Aljoscha Krettek How should Scala users access "fromHCat()"? Shall I create a separate TableEnviroment for the Scala API? Should the ExecutionEnvironment be converted to a TableEnvironment implicitly?

          In general: How should I create tests? Adding Hive as a dependency could be a little bit too much...

          Show
          twalthr Timo Walther added a comment - Thanks Robert, I solved the issue. The hive-site.xml has to be in the classpath. Everything else does not work. Aljoscha Krettek How should Scala users access "fromHCat()"? Shall I create a separate TableEnviroment for the Scala API? Should the ExecutionEnvironment be converted to a TableEnvironment implicitly? In general: How should I create tests? Adding Hive as a dependency could be a little bit too much...
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Does it not work we simply use the same TableEnvironment? What is the result of the fromHCat() call?

          Show
          aljoscha Aljoscha Krettek added a comment - Does it not work we simply use the same TableEnvironment? What is the result of the fromHCat() call?
          Hide
          twalthr Timo Walther added a comment -

          Sure that does also work. I just thought it is confusing if the user uses a TableEnvironment that is located in a "XXX.java.XXX" package for a Scala program.

          The result would be a Table.

          Show
          twalthr Timo Walther added a comment - Sure that does also work. I just thought it is confusing if the user uses a TableEnvironment that is located in a "XXX.java.XXX" package for a Scala program. The result would be a Table.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Ok, maybe if we move it to another package?

          Show
          aljoscha Aljoscha Krettek added a comment - Ok, maybe if we move it to another package?
          Hide
          twalthr Timo Walther added a comment -

          Moving is also an option. I think I will implement what I think is best and then we can discuss afterwards. Because there will be many changes necessary.

          E.g. I think it makes sense to pass an the ExecutionEnvironment to the TableEnvironment, otherwise I can't figure out if we are in the DataStream or DataSet area for the input format.

          Show
          twalthr Timo Walther added a comment - Moving is also an option. I think I will implement what I think is best and then we can discuss afterwards. Because there will be many changes necessary. E.g. I think it makes sense to pass an the ExecutionEnvironment to the TableEnvironment , otherwise I can't figure out if we are in the DataStream or DataSet area for the input format.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Yes, this seems necessary.

          Show
          aljoscha Aljoscha Krettek added a comment - Yes, this seems necessary.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the pull request:

          https://github.com/apache/flink/pull/1064#issuecomment-137080649

          @jamescao: It seems that you also wrote tests for the HCatInputFormat, right? is it possible to split the PR into a OutputFormat part and open a separate PR for the HCatInputFormat tests. I'm still working on FLINK-2167 and require a HCatalog testing infrastructure. Otherwise I have to write it my own. Anyway, I wonder why all HCat I/O format classes have no tests so far...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1064#issuecomment-137080649 @jamescao: It seems that you also wrote tests for the HCatInputFormat, right? is it possible to split the PR into a OutputFormat part and open a separate PR for the HCatInputFormat tests. I'm still working on FLINK-2167 and require a HCatalog testing infrastructure. Otherwise I have to write it my own. Anyway, I wonder why all HCat I/O format classes have no tests so far...
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/1127

          FLINK-2167 [table] Add fromHCat() to TableEnvironment

          This PR introduces input format interfaces (so-called `TableSource`s) for the Table API. There are two types of TableSources:

          • `AdaptiveTableSource`s can adapt their output to the requirements of the plan. Although the output schema stays the same, the TableSource can react on field resolution and/or predicates internally and can return adapted DataSet/DataStream versions in the "translate" step.
          • `StaticTableSource`s are an easy way to provide the Table API with additional input formats without much implementation effort (e.g. for fromCsvFile())

          TableSource have been deeply integrated into the Table API.

          The TableEnvironment now requires the newly introduced `AbstractExecutionEnvironment` (common super class of all ExecutionEnvironments for DataSets and DataStreams).

          An example of an AdaptiveTableSources can be found in `HCatTableSource`. HCatTableSource supports predicate pushdown as well as selection pushdown to HCatalog. Only those predicates are pushed to HCatalog that are partioned columns. Unresolved fields will not be read from HCatalog and remain `null` within the Table APIs rows.

          A an easy example looks like:
          ```
          TableEnironment t = new TableEnvironment(env);
          t.fromHCat("database", "table")
          .select("col1, col2")
          .filter("partCol==='5'");
          ```

          Here's what a TableSource can see from more complicated queries:

          ```
          getTableJava(tableSource1)
          .filter("a===5 || a===6")
          .select("a as a4, b as b4, c as c4")
          .filter("b4===7")
          .join(getTableJava(tableSource2))
          .where("a===a4 && c==='Test' && c4==='Test2'")

          // Result predicates for tableSource1:
          // List("a===5 || a===6", "b===7", "c==='Test2'")
          // Result predicates for tableSource2:
          // List("c==='Test'")
          // Result resolved fields for tableSource1 (true = filtering, false=selection):
          // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), ("c", true))
          // Result resolved fields for tableSource2 (true = filtering, false=selection):
          // Set(("a", true), ("c", true))
          ```

          HCatTableSource has no tests yet, but I will implement it them soon. First I would be happy about some general feedback.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink TableApiHcat

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/1127.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1127


          commit f245604caccd8f97c1d6eabf16968dab3aa47572
          Author: twalthr <twalthr@apache.org>
          Date: 2015-07-09T09:57:05Z

          FLINK-2167 [table] Add fromHCat() to TableEnvironment


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/1127 FLINK-2167 [table] Add fromHCat() to TableEnvironment This PR introduces input format interfaces (so-called `TableSource`s) for the Table API. There are two types of TableSources: `AdaptiveTableSource`s can adapt their output to the requirements of the plan. Although the output schema stays the same, the TableSource can react on field resolution and/or predicates internally and can return adapted DataSet/DataStream versions in the "translate" step. `StaticTableSource`s are an easy way to provide the Table API with additional input formats without much implementation effort (e.g. for fromCsvFile()) TableSource have been deeply integrated into the Table API. The TableEnvironment now requires the newly introduced `AbstractExecutionEnvironment` (common super class of all ExecutionEnvironments for DataSets and DataStreams). An example of an AdaptiveTableSources can be found in `HCatTableSource`. HCatTableSource supports predicate pushdown as well as selection pushdown to HCatalog. Only those predicates are pushed to HCatalog that are partioned columns. Unresolved fields will not be read from HCatalog and remain `null` within the Table APIs rows. A an easy example looks like: ``` TableEnironment t = new TableEnvironment(env); t.fromHCat("database", "table") .select("col1, col2") .filter("partCol==='5'"); ``` Here's what a TableSource can see from more complicated queries: ``` getTableJava(tableSource1) .filter("a===5 || a===6") .select("a as a4, b as b4, c as c4") .filter("b4===7") .join(getTableJava(tableSource2)) .where("a===a4 && c==='Test' && c4==='Test2'") // Result predicates for tableSource1: // List("a===5 || a===6", "b===7", "c==='Test2'") // Result predicates for tableSource2: // List("c==='Test'") // Result resolved fields for tableSource1 (true = filtering, false=selection): // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), ("c", true)) // Result resolved fields for tableSource2 (true = filtering, false=selection): // Set(("a", true), ("c", true)) ``` HCatTableSource has no tests yet, but I will implement it them soon. First I would be happy about some general feedback. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink TableApiHcat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1127.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1127 commit f245604caccd8f97c1d6eabf16968dab3aa47572 Author: twalthr <twalthr@apache.org> Date: 2015-07-09T09:57:05Z FLINK-2167 [table] Add fromHCat() to TableEnvironment
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chiwanpark commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-140327645

          Hi @twalthr, thanks for your contribution. But this PR contains many changes unrelated to HCatalog format. Maybe we should split this PR into HCatalog and other changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-140327645 Hi @twalthr, thanks for your contribution. But this PR contains many changes unrelated to HCatalog format. Maybe we should split this PR into HCatalog and other changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-140355813

          Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this PR to get some feedback and to show why my changes are necessary to integrate new input formats like HCatalog. You can ignore the `HCatTableSource` class as it is untested yet anyway.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-140355813 Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this PR to get some feedback and to show why my changes are necessary to integrate new input formats like HCatalog. You can ignore the `HCatTableSource` class as it is untested yet anyway.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40534193

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -37,7 +37,7 @@ under the License.
          <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>

          • <version>$ {guava.version}

            </version>
            + <version>14.0.1</version>

              • End diff –

          Why do you manually set the version here? Is it necessary because of something with HCat? @rmetzger what could we do in such a case?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40534193 — Diff: flink-staging/flink-table/pom.xml — @@ -37,7 +37,7 @@ under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>$ {guava.version} </version> + <version>14.0.1</version> End diff – Why do you manually set the version here? Is it necessary because of something with HCat? @rmetzger what could we do in such a case?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40534229

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -59,6 +59,12 @@ under the License.
          </dependency>

          <dependency>
          + <groupId>org.apache.flink</groupId>
          + <artifactId>flink-hcatalog</artifactId>
          + <version>$

          {project.version}

          </version>
          + </dependency>
          +
          + <dependency>
          — End diff –

          Is it necessary to have the hcatalog dependency here or could it be put into another package?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40534229 — Diff: flink-staging/flink-table/pom.xml — @@ -59,6 +59,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hcatalog</artifactId> + <version>$ {project.version} </version> + </dependency> + + <dependency> — End diff – Is it necessary to have the hcatalog dependency here or could it be put into another package?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40534596

          — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala —
          @@ -54,6 +56,26 @@ class JavaBatchTranslator extends PlanTranslator

          { Table(Root(rowDataSet, resultFields)) }

          + override def createTable(tableSource: TableSource): Table = {
          + // a TableSource requires an ExecutionEnvironment
          + if (env == null)

          { + throw new InvalidProgramException("This operation requires an ExecutionEnvironment.") + }

          + if (tableSource.isInstanceOf[AdaptiveTableSource]) {
          + val adaptiveTs = tableSource.asInstanceOf[AdaptiveTableSource]
          + if (adaptiveTs.supportsResolvedFieldPushdown || adaptiveTs.supportsPredicatePushdown)

          { + Table(Root(adaptiveTs, adaptiveTs.getOutputFields())) + }

          + else

          { + Table(Root(adaptiveTs.createAdaptiveDataSet(env), adaptiveTs.getOutputFields())) + }

          + }
          + else

          { + val staticTs = tableSource.asInstanceOf[StaticTableSource] + createTable(staticTs.createStaticDataSet(env), staticTs.getOutputFieldNames().mkString(",")) + }

          — End diff –

          You can replace the if/else by:

          ```scala
          tableSource match

          { case adaptive: AdaptiveTableSource if adaptive.supportsResolvedFieldPushdown && ... => ... case adaptive: AdaptiveTableSource => ... case static: StaticTableSource => ... case _ => throw new Exception("something unexpected") }

          ```

          if I'm not mistaken, this is just of the top of my head.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40534596 — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala — @@ -54,6 +56,26 @@ class JavaBatchTranslator extends PlanTranslator { Table(Root(rowDataSet, resultFields)) } + override def createTable(tableSource: TableSource): Table = { + // a TableSource requires an ExecutionEnvironment + if (env == null) { + throw new InvalidProgramException("This operation requires an ExecutionEnvironment.") + } + if (tableSource.isInstanceOf [AdaptiveTableSource] ) { + val adaptiveTs = tableSource.asInstanceOf [AdaptiveTableSource] + if (adaptiveTs.supportsResolvedFieldPushdown || adaptiveTs.supportsPredicatePushdown) { + Table(Root(adaptiveTs, adaptiveTs.getOutputFields())) + } + else { + Table(Root(adaptiveTs.createAdaptiveDataSet(env), adaptiveTs.getOutputFields())) + } + } + else { + val staticTs = tableSource.asInstanceOf[StaticTableSource] + createTable(staticTs.createStaticDataSet(env), staticTs.getOutputFieldNames().mkString(",")) + } — End diff – You can replace the if/else by: ```scala tableSource match { case adaptive: AdaptiveTableSource if adaptive.supportsResolvedFieldPushdown && ... => ... case adaptive: AdaptiveTableSource => ... case static: StaticTableSource => ... case _ => throw new Exception("something unexpected") } ``` if I'm not mistaken, this is just of the top of my head.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40534836

          — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala —
          @@ -118,6 +140,12 @@ class JavaBatchTranslator extends PlanTranslator {
          case Root(dataSet: JavaDataSet[Row], resultFields) =>
          dataSet

          + case Root(tableSource: AdaptiveTableSource, resultFields) =>
          + if (env == null) {
          + throw new InvalidProgramException("This operation requires an TableEnvironment.");
          — End diff –

          semicolon

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40534836 — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala — @@ -118,6 +140,12 @@ class JavaBatchTranslator extends PlanTranslator { case Root(dataSet: JavaDataSet [Row] , resultFields) => dataSet + case Root(tableSource: AdaptiveTableSource, resultFields) => + if (env == null) { + throw new InvalidProgramException("This operation requires an TableEnvironment."); — End diff – semicolon
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40535960

          — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala —
          @@ -30,21 +33,43 @@ import org.apache.flink.api.table.trees.Rule

          • Rule that resolved field references. This rule verifies that field references point to existing
          • fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
          • [[TypeInformation]] in addition to the field name.
            + *
            + * @param inputOperation is optional but required if resolved fields should be pushed to underlying
            + * table sources.
            + * @param filtering defines if the field is resolved as a part of filtering operation or not
            */
            -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
            +class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])],
            + inputOperation: PlanNode,
            + filtering: Boolean)
              • End diff –

          It is Scala Style to indent the parameters by 4 spaces (even the first parameter) if they don't fit on a line, like so:
          ```scala
          class ResolveFieldReferences(
          inputFields: Seq[(String, TypeInformation[_])],
          inputOperation: PlanNode,
          filtering: Boolean) extends Rule[Expression] {
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40535960 — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala — @@ -30,21 +33,43 @@ import org.apache.flink.api.table.trees.Rule Rule that resolved field references. This rule verifies that field references point to existing fields of the input operation and creates [ [ResolvedFieldReference] ]s that hold the field [ [TypeInformation] ] in addition to the field name. + * + * @param inputOperation is optional but required if resolved fields should be pushed to underlying + * table sources. + * @param filtering defines if the field is resolved as a part of filtering operation or not */ -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation [_] )]) +class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation [_] )], + inputOperation: PlanNode, + filtering: Boolean) End diff – It is Scala Style to indent the parameters by 4 spaces (even the first parameter) if they don't fit on a line, like so: ```scala class ResolveFieldReferences( inputFields: Seq[(String, TypeInformation [_] )], inputOperation: PlanNode, filtering: Boolean) extends Rule [Expression] { ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40536239

          — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala —
          @@ -0,0 +1,124 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.table.expressions.analysis
          +
          +import org.apache.flink.api.table.expressions._
          +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker
          +.resolveFieldNameAndTableSource
          +import org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr
          +import org.apache.flink.api.table.input.

          {AdaptiveTableSource, TableSource}

          +import org.apache.flink.api.table.plan._
          +import org.apache.flink.api.table.trees.Rule
          +
          +import scala.collection.mutable.ArrayBuffer
          +
          +/**
          + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each corresponding
          + * AdaptiveTableSource that support predicates.
          + */
          +class PredicatePushdown(val inputOperation: PlanNode) extends Rule[Expression] {
          +
          + def apply(expr: Expression) = {
          + // get all table sources where predicates can be push into
          + val tableSources = getPushableTableSources(inputOperation)
          +
          + // prune expression tree such that it only contains constant predicates
          + // such as a=1,a="Hello World", isNull(a) but not a=b
          + val constantExpr = pruneExpr(isResolvedAndConstant, expr)
          +
          + // push predicates to each table source respectively
          + for (ts <- tableSources) {
          + // prune expression tree such that it only contains field references of ts
          + val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr)
          +
          + // resolve field names to field names of the table source
          + val result = tsExpr.transformPost

          { + case rfr@ResolvedFieldReference(fieldName, typeInfo) => + ResolvedFieldReference( + resolveFieldNameAndTableSource(inputOperation, fieldName)._2, + typeInfo + ) + }

          + // push down predicates
          + if (result != NopExpression())

          { + ts.notifyPredicates(result) + }

          + }
          + expr
          + }
          +
          + // ----------------------------------------------------------------------------------------------
          +
          + /**
          + * @return all AdaptiveTableSources the given PlanNode contains
          + */
          + def getPushableTableSources(tree: PlanNode): Seq[AdaptiveTableSource] = tree match {
          + case Root(ts: AdaptiveTableSource, _) if ts.supportsPredicatePushdown() => Seq(ts)
          + case pn:PlanNode =>
          + val res = new ArrayBuffer[AdaptiveTableSource]()
          + for (child <- pn.children) res ++= getPushableTableSources(child)
          + res
          — End diff –

          This can be replaced by:
          ```scala
          pn.children flatMap

          { child => getPushableTableSources(child ) }

          ```
          if I'm not mistaken, seems more Scala-y :smile:

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40536239 — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala — @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker +.resolveFieldNameAndTableSource +import org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr +import org.apache.flink.api.table.input. {AdaptiveTableSource, TableSource} +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.trees.Rule + +import scala.collection.mutable.ArrayBuffer + +/** + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each corresponding + * AdaptiveTableSource that support predicates. + */ +class PredicatePushdown(val inputOperation: PlanNode) extends Rule [Expression] { + + def apply(expr: Expression) = { + // get all table sources where predicates can be push into + val tableSources = getPushableTableSources(inputOperation) + + // prune expression tree such that it only contains constant predicates + // such as a=1,a="Hello World", isNull(a) but not a=b + val constantExpr = pruneExpr(isResolvedAndConstant, expr) + + // push predicates to each table source respectively + for (ts <- tableSources) { + // prune expression tree such that it only contains field references of ts + val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr) + + // resolve field names to field names of the table source + val result = tsExpr.transformPost { + case rfr@ResolvedFieldReference(fieldName, typeInfo) => + ResolvedFieldReference( + resolveFieldNameAndTableSource(inputOperation, fieldName)._2, + typeInfo + ) + } + // push down predicates + if (result != NopExpression()) { + ts.notifyPredicates(result) + } + } + expr + } + + // ---------------------------------------------------------------------------------------------- + + /** + * @return all AdaptiveTableSources the given PlanNode contains + */ + def getPushableTableSources(tree: PlanNode): Seq [AdaptiveTableSource] = tree match { + case Root(ts: AdaptiveTableSource, _) if ts.supportsPredicatePushdown() => Seq(ts) + case pn:PlanNode => + val res = new ArrayBuffer [AdaptiveTableSource] () + for (child <- pn.children) res ++= getPushableTableSources(child) + res — End diff – This can be replaced by: ```scala pn.children flatMap { child => getPushableTableSources(child ) } ``` if I'm not mistaken, seems more Scala-y :smile:
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40536932

          — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateFilter.scala —
          @@ -0,0 +1,88 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.table.expressions.analysis
          +
          +import org.apache.flink.api.table.expressions._
          +
          +object PredicateFilter {
          — End diff –

          Maybe this should be called PredicatePruner

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40536932 — Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateFilter.scala — @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions._ + +object PredicateFilter { — End diff – Maybe this should be called PredicatePruner
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-143703367

          Hi,
          I like the work. :smile:

          Some remarks about Scala style. We are trying to make our Scala code more consistent. Mostly, I didn't comment everything, just one particular pattern of code that could be changed.

          Now, for the technical part, you are not actually removing the predicates that are pushed down from an expression, correct? This should be alright, since the result will still be correct. It might just be some future optimization, but I also think that the cost of evaluating the predicate is negligible. The real improvement comes from early filtering in the sources, as you implemented.

          Then, why do you have the `supports*` methods in `AdaptiveTableSource`. Couldn't these methods just do nothing in case the source does not support the feature. Or maybe return false if the pushdown was not successful. (I also wonder why you have the differentiation between AdaptiveSources that support pushdown and those that don't in `JavaBatchTranslator.createTable`. (I think you do it so that stuff does not get pushed to sources that don't support it, but this distinction might not be necessary, as mentioned above.)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-143703367 Hi, I like the work. :smile: Some remarks about Scala style. We are trying to make our Scala code more consistent. Mostly, I didn't comment everything, just one particular pattern of code that could be changed. Now, for the technical part, you are not actually removing the predicates that are pushed down from an expression, correct? This should be alright, since the result will still be correct. It might just be some future optimization, but I also think that the cost of evaluating the predicate is negligible. The real improvement comes from early filtering in the sources, as you implemented. Then, why do you have the `supports*` methods in `AdaptiveTableSource`. Couldn't these methods just do nothing in case the source does not support the feature. Or maybe return false if the pushdown was not successful. (I also wonder why you have the differentiation between AdaptiveSources that support pushdown and those that don't in `JavaBatchTranslator.createTable`. (I think you do it so that stuff does not get pushed to sources that don't support it, but this distinction might not be necessary, as mentioned above.)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40540138

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -37,7 +37,7 @@ under the License.
          <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>

          • <version>$ {guava.version}

            </version>
            + <version>14.0.1</version>

              • End diff –

          Yes, I had method signature problems with the HCat input format. We are using a very old version of HCat and therefore need a very old version of guava.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40540138 — Diff: flink-staging/flink-table/pom.xml — @@ -37,7 +37,7 @@ under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>$ {guava.version} </version> + <version>14.0.1</version> End diff – Yes, I had method signature problems with the HCat input format. We are using a very old version of HCat and therefore need a very old version of guava.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40540522

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -59,6 +59,12 @@ under the License.
          </dependency>

          <dependency>
          + <groupId>org.apache.flink</groupId>
          + <artifactId>flink-hcatalog</artifactId>
          + <version>$

          {project.version}

          </version>
          + </dependency>
          +
          + <dependency>
          — End diff –

          What do you mean with "put into another package"? Do you mean putting Table API classes in the `flink-hcatalog` package? I think thats a general decision we have to make. Or do we want additional Maven Table API I/O format modules? For Parquet, HCat. etc...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40540522 — Diff: flink-staging/flink-table/pom.xml — @@ -59,6 +59,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hcatalog</artifactId> + <version>$ {project.version} </version> + </dependency> + + <dependency> — End diff – What do you mean with "put into another package"? Do you mean putting Table API classes in the `flink-hcatalog` package? I think thats a general decision we have to make. Or do we want additional Maven Table API I/O format modules? For Parquet, HCat. etc...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-143718074

          Thanks @aljoscha for reviewing my code! Sorry for the bad Scala style. This was my first really large Scala code project I wrote (not just small scripts), I'm still learning by doing I will correct the issues you mentioned.

          For the technical part: Yes, I don't modify the expression tree, I'm just giving the sources the possibility to adapt to the need of the program. Regarding the `supports*` methods, yes you are right, actually they are not necessary, but I thought it makes sense for possible future table sources to check that in advance. In some cases it also reduces the amout of some method calls, but I can also remove the `supports*` methods for reason of simplicity, no problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-143718074 Thanks @aljoscha for reviewing my code! Sorry for the bad Scala style. This was my first really large Scala code project I wrote (not just small scripts), I'm still learning by doing I will correct the issues you mentioned. For the technical part: Yes, I don't modify the expression tree, I'm just giving the sources the possibility to adapt to the need of the program. Regarding the `supports*` methods, yes you are right, actually they are not necessary, but I thought it makes sense for possible future table sources to check that in advance. In some cases it also reduces the amout of some method calls, but I can also remove the `supports*` methods for reason of simplicity, no problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40545360

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -59,6 +59,12 @@ under the License.
          </dependency>

          <dependency>
          + <groupId>org.apache.flink</groupId>
          + <artifactId>flink-hcatalog</artifactId>
          + <version>$

          {project.version}

          </version>
          + </dependency>
          +
          + <dependency>
          — End diff –

          I meant putting the HCat specific stuff into a different package. But I now realize that we could not have the `fromHCat` call on `TableEnvironment` then. So this is ok, I guess.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40545360 — Diff: flink-staging/flink-table/pom.xml — @@ -59,6 +59,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hcatalog</artifactId> + <version>$ {project.version} </version> + </dependency> + + <dependency> — End diff – I meant putting the HCat specific stuff into a different package. But I now realize that we could not have the `fromHCat` call on `TableEnvironment` then. So this is ok, I guess.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40545392

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -37,7 +37,7 @@ under the License.
          <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>

          • <version>$ {guava.version}

            </version>
            + <version>14.0.1</version>

              • End diff –

          But then this could lead to problems with shading. Let's wait for @rmetzger on this, he probably knows this stuff best.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40545392 — Diff: flink-staging/flink-table/pom.xml — @@ -37,7 +37,7 @@ under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>$ {guava.version} </version> + <version>14.0.1</version> End diff – But then this could lead to problems with shading. Let's wait for @rmetzger on this, he probably knows this stuff best.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-143728302

          Don't worry, everyone was once a starter in Scala. :smiley:

          You can also leave the `supports*` methods in if you think that they might be necessary in the future.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-143728302 Don't worry, everyone was once a starter in Scala. :smiley: You can also leave the `supports*` methods in if you think that they might be necessary in the future.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40674158

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -37,7 +37,7 @@ under the License.
          <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>

          • <version>$ {guava.version}

            </version>
            + <version>14.0.1</version>

              • End diff –

          What exactly was the problem with the hcat input format and guava?
          If their guava version is really incompatible with ours, we can do the following:

          • Use a newer hcat version
          • create a special shaded-hcat maven module which shades away hcat's guava.
          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40674158 — Diff: flink-staging/flink-table/pom.xml — @@ -37,7 +37,7 @@ under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>$ {guava.version} </version> + <version>14.0.1</version> End diff – What exactly was the problem with the hcat input format and guava? If their guava version is really incompatible with ours, we can do the following: Use a newer hcat version create a special shaded-hcat maven module which shades away hcat's guava.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1127#discussion_r40674516

          — Diff: flink-staging/flink-table/pom.xml —
          @@ -37,7 +37,7 @@ under the License.
          <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>

          • <version>$ {guava.version}

            </version>
            + <version>14.0.1</version>

              • End diff –

          Thanks so far, I will look into this issue again and will report.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40674516 — Diff: flink-staging/flink-table/pom.xml — @@ -37,7 +37,7 @@ under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>$ {guava.version} </version> + <version>14.0.1</version> End diff – Thanks so far, I will look into this issue again and will report.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the pull request:

          https://github.com/apache/flink/pull/1127#issuecomment-145001030

          Again, thanks for the feedback.
          I will close this PR and open 2 separate PRs for TableSources and HCatInputFormat.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-145001030 Again, thanks for the feedback. I will close this PR and open 2 separate PRs for TableSources and HCatInputFormat.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr closed the pull request at:

          https://github.com/apache/flink/pull/1127

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1127
          Hide
          fhueske Fabian Hueske added a comment -

          I updated the title and description of this JIRA to reflect the changes in the Table API architecture.

          Show
          fhueske Fabian Hueske added a comment - I updated the title and description of this JIRA to reflect the changes in the Table API architecture.

            People

            • Assignee:
              Unassigned
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Development