Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The TableSource interface does currently only support the definition of flat rows.

      However, there are several storage formats for nested data that should be supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can also natively handle nested rows.

      The TableSource interface and the code to register table sources in Calcite's schema need to be extended to support nested data.

        Issue Links

          Activity

          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          If this is not urgent and nobody has started working on this, I can give it a try.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - If this is not urgent and nobody has started working on this, I can give it a try.
          Hide
          fhueske Fabian Hueske added a comment -

          Sure, you can work on this Ivan Mushketyk.
          I think it makes sense to first design the interfaces before we start with the actual implementation.

          Our goal should be to change as little as possible on the current interfaces. It should still be possible to define TableSources for tables with flat schema in an easy way.

          I would propose the following:

          • create a FlatTableSouce interface and move the TableSource.getFieldNames() and TableSource.getFieldTypes() methods there. The TableSource.getNumberOfFields() method can be dropped.
          • create a NestedTableSource interface that provides methods to derive a nested schema (field names and types). We need to decide how this is supposed to look like.
          • Change all classes that currently implement TableSource to also implement either FlatTableSource. NestedTableSource will be used for instance for the Avro table source.
          • We need to modify / extend the way that table sources are currently registered. First, we need to distinguish flat and nested sources. For the nested sources we need an implementation that converts the information of the NestedTableSource interface into the RelDataType required by Calcite's Table interface (see FlinkTable).

          What do you think?

          Show
          fhueske Fabian Hueske added a comment - Sure, you can work on this Ivan Mushketyk . I think it makes sense to first design the interfaces before we start with the actual implementation. Our goal should be to change as little as possible on the current interfaces. It should still be possible to define TableSources for tables with flat schema in an easy way. I would propose the following: create a FlatTableSouce interface and move the TableSource.getFieldNames() and TableSource.getFieldTypes() methods there. The TableSource.getNumberOfFields() method can be dropped. create a NestedTableSource interface that provides methods to derive a nested schema (field names and types). We need to decide how this is supposed to look like. Change all classes that currently implement TableSource to also implement either FlatTableSource . NestedTableSource will be used for instance for the Avro table source. We need to modify / extend the way that table sources are currently registered. First, we need to distinguish flat and nested sources. For the nested sources we need an implementation that converts the information of the NestedTableSource interface into the RelDataType required by Calcite's Table interface (see FlinkTable ). What do you think?
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Thank you Fabian Hueske.

          The general idea seems feasible, but I still have few conceptual questions:

          • Isn't FlatTableSource just an edge case of a NestedTableSource?
          • Why do we need two different table source types in the first place? Is it because we need to register flat schema and nested schema in a different way? If so would a flag/enum be a better alternative to distinguish between flat/nested types?

          I also have a more technical question. I spend some time looking throught the flink-table and Calcite sources, but I wasn't able to find an example of how to register a nested field in Calcite (I assume we don't do this already). Do you know where I can find an example for this?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Thank you Fabian Hueske . The general idea seems feasible, but I still have few conceptual questions: Isn't FlatTableSource just an edge case of a NestedTableSource ? Why do we need two different table source types in the first place? Is it because we need to register flat schema and nested schema in a different way? If so would a flag/enum be a better alternative to distinguish between flat/nested types? I also have a more technical question. I spend some time looking throught the flink-table and Calcite sources, but I wasn't able to find an example of how to register a nested field in Calcite (I assume we don't do this already). Do you know where I can find an example for this?
          Hide
          fhueske Fabian Hueske added a comment -

          Thanks for the questions Ivan Mushketyk.

          It is true. FlatTableSource would be a special case of NestedTableSource. I proposed this structure mainly to limit the changes to the API. On the other hand, I don't think that this interface has been used a lot and it is rather an internal interface. Flat and nested tables are registered the same, however, the definition of nested schema is a bit more complex. If we design a good API for nested schema, we can also keep a single interface.

          I think we do register tables with nested schema when a DataSet with nested types (such as a Tuple nested in a Tuple) is converted into a Table.

          Show
          fhueske Fabian Hueske added a comment - Thanks for the questions Ivan Mushketyk . It is true. FlatTableSource would be a special case of NestedTableSource . I proposed this structure mainly to limit the changes to the API. On the other hand, I don't think that this interface has been used a lot and it is rather an internal interface. Flat and nested tables are registered the same, however, the definition of nested schema is a bit more complex. If we design a good API for nested schema, we can also keep a single interface. I think we do register tables with nested schema when a DataSet with nested types (such as a Tuple nested in a Tuple) is converted into a Table.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Fabian Hueske ,

          Thank you for your comments. It's a much clearer now, but it seems that I am either still missing something obvious or it seems to me that the task is more involved than it was described.

          Let me first describe how I understand this issue so that you could correct me.

          So the goal of this task is to support nested data structures. So it means that if we have a type definition like this:

          class ParentPojo {
            ChildPojo child;
            int num;
          }
          
          class ChildPojo {
            String str;
          }
          

          and we have a TableSource that returns a dataset of ParentPojo we can access nested fields in SQL queries. Something like:

          SELECT * FROM pojos WHERE child.str LIKE '%Rubber%'
          

          In this case child.str is a way to access a nested field.

          The first thing that confuses me is that current SQL grammar does not seem to support any nested fields access, but I think may be a relatively minor nuisance.

          If I understand it correctly internally flink-table converts any input into a dataset of Rows and then performs operations on it. To convert a nested ParentPojo into a flat schema we can extract all leaf values into two columns:

          child.str num
          

          similarly to how Parquet identifies columns in nested types (see the following slide)

          Now, where this becomes more interesting. If I understand it correctly BatchScan#convertToExpectedType is used to convert an input dataset into a dataset of Row*s. For this task it generates a mapper function in *FlinkRel#getConversionMapper which than calls CodeGenerator#generateConverterResultExpression.

          So in our case it should generate code similar to something like:

          public Row map(ParentPojo parent) {
          	Row row = new Row(2);
          	row.setField(0, parent.child.str);
          	row.setField(1, parent.num);
          
          	return row;
          }
          

          CodeGenerator accepts fieldNames and optional POJO field mapping to generate accessors. It seems that the main work is performed in CodeGenerator#generateFieldAccess that generates an access code for different fields of the POJO, but it does not create any code that accesses nested fields. It just generates an access code to a POJO field with a corresponding field name in CodeGenerator#generateFieldAccess.

          Therefore, if I understand this correctly, we need to start with updating CodeGenerator to generate nested accessors and then we can extend TableSource to support nested data.

          Am I overthink this issue? Or am I missing something obvious?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Fabian Hueske , Thank you for your comments. It's a much clearer now, but it seems that I am either still missing something obvious or it seems to me that the task is more involved than it was described. Let me first describe how I understand this issue so that you could correct me. So the goal of this task is to support nested data structures. So it means that if we have a type definition like this: class ParentPojo { ChildPojo child; int num; } class ChildPojo { String str; } and we have a TableSource that returns a dataset of ParentPojo we can access nested fields in SQL queries. Something like: SELECT * FROM pojos WHERE child.str LIKE '%Rubber%' In this case child.str is a way to access a nested field. The first thing that confuses me is that current SQL grammar does not seem to support any nested fields access, but I think may be a relatively minor nuisance. If I understand it correctly internally flink-table converts any input into a dataset of Rows and then performs operations on it. To convert a nested ParentPojo into a flat schema we can extract all leaf values into two columns: child.str num similarly to how Parquet identifies columns in nested types (see the following slide ) Now, where this becomes more interesting. If I understand it correctly BatchScan#convertToExpectedType is used to convert an input dataset into a dataset of Row*s. For this task it generates a mapper function in *FlinkRel#getConversionMapper which than calls CodeGenerator#generateConverterResultExpression . So in our case it should generate code similar to something like: public Row map(ParentPojo parent) { Row row = new Row(2); row.setField(0, parent.child.str); row.setField(1, parent.num); return row; } CodeGenerator accepts fieldNames and optional POJO field mapping to generate accessors. It seems that the main work is performed in CodeGenerator#generateFieldAccess that generates an access code for different fields of the POJO, but it does not create any code that accesses nested fields. It just generates an access code to a POJO field with a corresponding field name in CodeGenerator#generateFieldAccess. Therefore, if I understand this correctly, we need to start with updating CodeGenerator to generate nested accessors and then we can extend TableSource to support nested data. Am I overthink this issue? Or am I missing something obvious?
          Hide
          jark Jark Wu added a comment -

          Hi Ivan Mushketyk, I will try to answer your question.

          The main problem you confused is that CodeGenerator doesn't support nested access. Actually, it has been fixed in FLINK-4294, and you can have a look at the test example CompositeAccessTest. I think it will give you some inspiration.

          And the other problem that BatchScan#convertToExpectedType will convert input dataset into Row type. Actually, it will not flatten the nested fields, but keep the same schema in Row. In your case, the ParentPojo will be converted Row type with Row(child: ChildPojo, num:Int).

          Hope that will help you.

          Show
          jark Jark Wu added a comment - Hi Ivan Mushketyk , I will try to answer your question. The main problem you confused is that CodeGenerator doesn't support nested access. Actually, it has been fixed in FLINK-4294 , and you can have a look at the test example CompositeAccessTest . I think it will give you some inspiration. And the other problem that BatchScan#convertToExpectedType will convert input dataset into Row type. Actually, it will not flatten the nested fields, but keep the same schema in Row. In your case, the ParentPojo will be converted Row type with Row(child: ChildPojo, num:Int) . Hope that will help you.
          Hide
          fhueske Fabian Hueske added a comment - - edited

          Hi Ivan Mushketyk,

          First of all, you are completely right about the motivation for this issue. The goal is to add support for TableSources that return nested data just as in your example.
          The Table API and SQL do support accessing nested fields since FLINK-4249 as Jark Wu mentioned (see "Value access functions" at the end of the Built-In SQL function in the documentation as well).

          Jark Wu described that only the first level is flattened, i.e., the functions do not handle custom POJOs but only Rows (with possibly nested Pojos).

          The actual challenge is that the TableSource would need to provide names for the nested fields. This is not possible with the current interface (without some implicit conventions) because the names are returned as flat String array.
          Take the example of a TableSource that reads Avro data. Either the data is read as specific object, then we are good, because these are concrete, code-generated POJOs which the Table API can directly process. In case of a generic record, the data is hold in a Map<String,String>> and accessed using a schema. A TableSource would need to convert such a generic record into a nested data structure (for example nested {{Row}}s) and assign the names for the nested attributes. This is currently not possible.

          One possible solution would be to add support for names in RowTypeInfo and only declare the names of the first level attributes via the current interface and all nested field via the RowTypeInfo.
          What I don't like about this approach is that the field names are hidden in the TypeInformation. But maybe that's the best approach because we can keep the interface and every TableSource could fall back to returning rows.

          Show
          fhueske Fabian Hueske added a comment - - edited Hi Ivan Mushketyk , First of all, you are completely right about the motivation for this issue. The goal is to add support for TableSources that return nested data just as in your example. The Table API and SQL do support accessing nested fields since FLINK-4249 as Jark Wu mentioned (see "Value access functions" at the end of the Built-In SQL function in the documentation as well). Jark Wu described that only the first level is flattened, i.e., the functions do not handle custom POJOs but only Rows (with possibly nested Pojos). The actual challenge is that the TableSource would need to provide names for the nested fields. This is not possible with the current interface (without some implicit conventions) because the names are returned as flat String array. Take the example of a TableSource that reads Avro data. Either the data is read as specific object, then we are good, because these are concrete, code-generated POJOs which the Table API can directly process. In case of a generic record, the data is hold in a Map<String,String>> and accessed using a schema. A TableSource would need to convert such a generic record into a nested data structure (for example nested {{Row}}s) and assign the names for the nested attributes. This is currently not possible. One possible solution would be to add support for names in RowTypeInfo and only declare the names of the first level attributes via the current interface and all nested field via the RowTypeInfo . What I don't like about this approach is that the field names are hidden in the TypeInformation . But maybe that's the best approach because we can keep the interface and every TableSource could fall back to returning rows.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Fabian, Jark,

          Thank you for all your comments and for your patience.

          Let me try to propose a solution and see if this will work.

          I performed a simple test using TableSource, and it seems that we can access nested fields. Here is my test TableSource that returns POJOs:

          https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c

          And here is a test code that uses it:

          ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
          
          tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
          
          Table result = tableEnv
          	.sql("SELECT MyTable.amount * MyTable.id, MyTable.name, MyTable.childPojo.child.str FROM MyTable WHERE amount < 4");
          
          DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
          List<Row> results = resultSet.collect();
          

          And the result of the test seems feesible:

          [0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1, 17,pojo17,mostChildPojo17, 33,pojo33,mostChildPojo33, 36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19, 9,pojo3,mostChildPojo3]
          

          Since we can access nested fields, it looks like we only need to convert the first level of fields into a Row. The result Row will contain potentially nested POJOs, but this does not seem to be an issue. I don't see why do we need to go beyond one level of unpacking when we create a Row, so will make an assumption this is all we need.

          To do this, we need to specify how each field of a result Row should be extracted from TableSource's type T. We can add a new method called: getFieldMapping that will return an array of strings. A String in position i will be a field name that should be accessed to get i-th Row field value. So for example in this comment it can be implemented simply like this:

          @Override
          public String[] getFieldMapping() {
          	return new String[]{"amount", "childPojo", "id", "name"};
          }
          

          Which means that to get value for a 0-th field in the result Row we need to access field amount, to get 1-st field we need to access field "childPojo" and so on.

          In cases, if we need to convert an indexable type like a tuple or an array we do not need this mapping. In this case, we can return null or an empty array. Optional would be a better option, but I think that Flink should work for both Java 7 and Java 8.

          The only problem with this approach that the FlinkTable class accepts an array of field indexes that is used to convert values from original type into a Row:

          Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml
          abstract class FlinkTable[T](
              val typeInfo: TypeInformation[T],
              val fieldIndexes: Array[Int],
              val fieldNames: Array[String])
            extends AbstractTable {
            ...
          }
          

          So to work around this I propose to change this to:

          Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml
          abstract class FlinkTable[T](
              val typeInfo: TypeInformation[T],
              val fieldIndexes: Array[Int],
              val fieldMappings: Optional[Array[String]], // <--- New argument
              val fieldNames: Array[String])
            extends AbstractTable {
            ...
          }
          

          We can then use this fieldMappings in CodeGenerator to generate a proper mapper.

          This will technically make it possible to convert GenericRecord into a Row. But since GenericRecord implements Avro's interfaces do we need to add a dependency on Avro in flink-table to access these fields? Or should we use reflection to access these methods? Or should we ignore GenericRecord case altogether and simply return Row from KafkaTableSource?

          I also wonder why do we need this method in the TableSource interface:

          Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml
            /** Returns the number of fields of the table. */
            def getNumberOfFields: Int
          

          and I wonder if we can drop it.

          What do you think about it? Am I missing something?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Fabian, Jark, Thank you for all your comments and for your patience. Let me try to propose a solution and see if this will work. I performed a simple test using TableSource, and it seems that we can access nested fields. Here is my test TableSource that returns POJOs: https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c And here is a test code that uses it: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); tableEnv.registerTableSource( "MyTable" , new TestBatchTableSource()); Table result = tableEnv .sql( "SELECT MyTable.amount * MyTable.id, MyTable.name, MyTable.childPojo.child.str FROM MyTable WHERE amount < 4" ); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); And the result of the test seems feesible: [0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1, 17,pojo17,mostChildPojo17, 33,pojo33,mostChildPojo33, 36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19, 9,pojo3,mostChildPojo3] Since we can access nested fields, it looks like we only need to convert the first level of fields into a Row . The result Row will contain potentially nested POJOs, but this does not seem to be an issue. I don't see why do we need to go beyond one level of unpacking when we create a Row , so will make an assumption this is all we need. To do this, we need to specify how each field of a result Row should be extracted from TableSource 's type T. We can add a new method called: getFieldMapping that will return an array of strings. A String in position i will be a field name that should be accessed to get i-th Row field value. So for example in this comment it can be implemented simply like this: @Override public String [] getFieldMapping() { return new String []{ "amount" , "childPojo" , "id" , "name" }; } Which means that to get value for a 0-th field in the result Row we need to access field amount , to get 1-st field we need to access field "childPojo" and so on. In cases, if we need to convert an indexable type like a tuple or an array we do not need this mapping. In this case, we can return null or an empty array. Optional would be a better option, but I think that Flink should work for both Java 7 and Java 8. The only problem with this approach that the FlinkTable class accepts an array of field indexes that is used to convert values from original type into a Row : Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], val fieldNames: Array[ String ]) extends AbstractTable { ... } So to work around this I propose to change this to: Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], val fieldMappings: Optional[Array[ String ]], // <--- New argument val fieldNames: Array[ String ]) extends AbstractTable { ... } We can then use this fieldMappings in CodeGenerator to generate a proper mapper. This will technically make it possible to convert GenericRecord into a Row . But since GenericRecord implements Avro's interfaces do we need to add a dependency on Avro in flink-table to access these fields? Or should we use reflection to access these methods? Or should we ignore GenericRecord case altogether and simply return Row from KafkaTableSource ? I also wonder why do we need this method in the TableSource interface: Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml /** Returns the number of fields of the table. */ def getNumberOfFields: Int and I wonder if we can drop it. What do you think about it? Am I missing something?
          Hide
          jark Jark Wu added a comment -

          Hi Ivan Mushketyk, thanks for your detailed and clear proposal.

          Regarding to the new argument fieldMappings in FlinkTable, I think it is playing the same role of fieldIndexes. Actually, fieldIndexes is the inputPojoFieldMapping in CodeGenerator when converting. In case of POJO, fieldIndexes is a fieldMapping. In other cases, it is an array of 0~n.

          Regarding to the getNumberOfFields in TableSource, yes, it is used rarely used and can be replaced by getFieldsNames.length if getFieldsNames still display the first level attributes.

          Hi Fabian Hueske, I agree with the RowTypeInfo approach which is similar to Calcite's way I think. But we should support custom names in RowTypeInfo first.

          Show
          jark Jark Wu added a comment - Hi Ivan Mushketyk , thanks for your detailed and clear proposal. Regarding to the new argument fieldMappings in FlinkTable , I think it is playing the same role of fieldIndexes . Actually, fieldIndexes is the inputPojoFieldMapping in CodeGenerator when converting. In case of POJO, fieldIndexes is a fieldMapping. In other cases, it is an array of 0~n . Regarding to the getNumberOfFields in TableSource , yes, it is used rarely used and can be replaced by getFieldsNames.length if getFieldsNames still display the first level attributes. Hi Fabian Hueske , I agree with the RowTypeInfo approach which is similar to Calcite's way I think. But we should support custom names in RowTypeInfo first.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Jark Wu, thank you for your reply.

          I see that fieldIndexes play a similar role and that they are used in CodeGenerator, but it is not clear what should be the order of these mapping ids. If you take a look at the test POJO TableSource that I've created:
          https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c

          you can see that the order of fields returned by getFieldsNames is pretty much random:

          @Override
          public String[] getFieldsNames() {
          	return new String[]{"amount", "childPojo", "id", "name"};
          }
          

          In does not match fields order neither in POJO class definition nor in POJO type information. I had to pretty much come up with it by trial and error, because (if I understand it correctly) there is no explicit convention on what this order should be. That's why I am proposing to add a new method that will help to establish clear relationship between fields ids in the result Row and field in the original POJO type.

          I also think that using fieldIndexes will be an issue if we want to support conversion from types like GenericRecord that provide a Map-like interface with get and put methods that do not have explicit ordering.

          Does it make sense? Or am I trying to solve a wrong problem?

          Jark Wu, Fabian Hueske could you please describe you idea regarding RowTypeInfo approach in more details? I don't think I understand what you propose to do.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Jark Wu , thank you for your reply. I see that fieldIndexes play a similar role and that they are used in CodeGenerator , but it is not clear what should be the order of these mapping ids. If you take a look at the test POJO TableSource that I've created: https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c you can see that the order of fields returned by getFieldsNames is pretty much random: @Override public String [] getFieldsNames() { return new String []{ "amount" , "childPojo" , "id" , "name" }; } In does not match fields order neither in POJO class definition nor in POJO type information. I had to pretty much come up with it by trial and error, because (if I understand it correctly) there is no explicit convention on what this order should be. That's why I am proposing to add a new method that will help to establish clear relationship between fields ids in the result Row and field in the original POJO type. I also think that using fieldIndexes will be an issue if we want to support conversion from types like GenericRecord that provide a Map -like interface with get and put methods that do not have explicit ordering. Does it make sense? Or am I trying to solve a wrong problem? Jark Wu , Fabian Hueske could you please describe you idea regarding RowTypeInfo approach in more details? I don't think I understand what you propose to do.
          Hide
          jark Jark Wu added a comment -

          Hi Ivan Mushketyk,

          Yes, you are right. In your case, the POJO TableSource's fieldIndexes is not clear. But we can use getFieldsNames and getResultType to generate the fieldIndexes. So the new getFieldMapping is still duplicate with getFieldsNames, am I right?

          I don't know much about GenericRecord, maybe Fabian Hueske can answer your question. Does GenericRecord has an immutable schema, or will change every record ?

          IMO, the TableSource interface can be simplified to this:

          trait TableSource[T] {
            /** Return this table source's row type. The returned RowTypeInfo is a composite type and can have
            * nested types whose fields describe the names and types of the columns in this table. */
            def getReturnType: RowTypeInfo
          }
          

          The getReturnType is forced to return a RowTypeInfo. It describes the first level field names and types (maybe nested). So that we can support nested data for TableSource. But currently, the RowTypeInfo doesn't support custom field names, so we should fix that first.

          And the original getNumberOfFields , getFieldsNames , getFieldTypes interfaces in TableSource could be removed, as they can be derived from the returned RowTypeInfo. Finally, it will be similar to Calcite's Table interface which actually only has a RelDataType getRowType(RelDataTypeFactory typeFactory) method to implement.

          What do you think ? Fabian Hueske Ivan Mushketyk

          Show
          jark Jark Wu added a comment - Hi Ivan Mushketyk , Yes, you are right. In your case, the POJO TableSource's fieldIndexes is not clear. But we can use getFieldsNames and getResultType to generate the fieldIndexes . So the new getFieldMapping is still duplicate with getFieldsNames , am I right? I don't know much about GenericRecord , maybe Fabian Hueske can answer your question. Does GenericRecord has an immutable schema, or will change every record ? IMO, the TableSource interface can be simplified to this: trait TableSource[T] { /** Return this table source's row type. The returned RowTypeInfo is a composite type and can have * nested types whose fields describe the names and types of the columns in this table. */ def getReturnType: RowTypeInfo } The getReturnType is forced to return a RowTypeInfo. It describes the first level field names and types (maybe nested). So that we can support nested data for TableSource. But currently, the RowTypeInfo doesn't support custom field names, so we should fix that first. And the original getNumberOfFields , getFieldsNames , getFieldTypes interfaces in TableSource could be removed, as they can be derived from the returned RowTypeInfo. Finally, it will be similar to Calcite's Table interface which actually only has a RelDataType getRowType(RelDataTypeFactory typeFactory) method to implement. What do you think ? Fabian Hueske Ivan Mushketyk
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Jark Wu

          Your suggestion makes sense to me.

          >> But currently, the RowTypeInfo doesn't support custom field names, so we should fix that first.
          Can we do it as part of this issue?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Jark Wu Your suggestion makes sense to me. >> But currently, the RowTypeInfo doesn't support custom field names, so we should fix that first. Can we do it as part of this issue?
          Hide
          jark Jark Wu added a comment -

          Row and RowTypeInfo has been moved to flink-core. So I would suggest to do it in a separate issue. I created FLINK-5348 to fix it.

          Show
          jark Jark Wu added a comment - Row and RowTypeInfo has been moved to flink-core. So I would suggest to do it in a separate issue. I created FLINK-5348 to fix it.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Thank you for working on it. I'll start working on this task when you are FLINK-5348 is finished.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Thank you for working on it. I'll start working on this task when you are FLINK-5348 is finished.
          Hide
          fhueske Fabian Hueske added a comment - - edited

          Sorry for my late response. I'll try to answer your questions and will comment on some of your ideas:

          • The order of fields in a PojoTypeInfo depend on the order in which the fields are returned by Java' reflection interfaces. I think the order is lexicographic. But as Jark Wu said, we can get the indexes of the fields by PojoTypeInfo.getFieldIndex(). This will give us the right indexes.
          • We do not need (and don't want to have) an Avro dependency in flink-table. The mapping should happen in the TableSource which should be located in a connector Maven module.
          • A Generic Avro record is a generic holder for data of any schema. The data of a generic record object is interpreted using an Avro Schema. The Schema would give us the required field names and types. Using the schema, we could construct a Row with possibly nested Row s and move all data from a generic record into a nested Row object.
          • TableSource.getNumberOfFields() can be dropped. The question is whether this is important enough to break the API. If we decide to touch the interface, I'm +1 to remove it.
          • I'm not sure about requiring that a TableSource must return a Row. In case of a Specific Avro record, we would need an additional step to copy the first-level Pojo fields into a Row, which would need some reflection or code generation, instead of simply forwarding the Avro object. We could still allow any kind of type information and use field names provided by the TypeInformation. If the return type is a Pojo, we would use its field names. If the return type is a Tuple, the fields would be named `f0`, `f1`, .... If this is not desired, the TableSource could return Row s. If we want to rename fields, we have to use Row as well.
          Show
          fhueske Fabian Hueske added a comment - - edited Sorry for my late response. I'll try to answer your questions and will comment on some of your ideas: The order of fields in a PojoTypeInfo depend on the order in which the fields are returned by Java' reflection interfaces. I think the order is lexicographic. But as Jark Wu said, we can get the indexes of the fields by PojoTypeInfo.getFieldIndex() . This will give us the right indexes. We do not need (and don't want to have) an Avro dependency in flink-table . The mapping should happen in the TableSource which should be located in a connector Maven module. A Generic Avro record is a generic holder for data of any schema. The data of a generic record object is interpreted using an Avro Schema. The Schema would give us the required field names and types. Using the schema, we could construct a Row with possibly nested Row s and move all data from a generic record into a nested Row object. TableSource.getNumberOfFields() can be dropped. The question is whether this is important enough to break the API. If we decide to touch the interface, I'm +1 to remove it. I'm not sure about requiring that a TableSource must return a Row . In case of a Specific Avro record, we would need an additional step to copy the first-level Pojo fields into a Row , which would need some reflection or code generation, instead of simply forwarding the Avro object. We could still allow any kind of type information and use field names provided by the TypeInformation . If the return type is a Pojo, we would use its field names. If the return type is a Tuple, the fields would be named `f0`, `f1`, .... If this is not desired, the TableSource could return Row s. If we want to rename fields, we have to use Row as well.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Fabian,

          Thank you for your reply.

          At first a question about your comment.

          In case of a Specific Avro record, we would need an additional step to copy the first-level Pojo fields into a Row

          Does "Specific Avro" mean a regular POJO?

          Regarding the TableSource interface, I think I've lost track of what problem we are trying to solve here

          I see the following problems with the current interface:

          • There is no explicit relationship between fields positions in a Row and order of fields in a POJO type. As you mentioned, we can get fields order via PojoTypeInfo.getFieldIndex(). Since TableSource has a method getReturnType that returns TypeInformation, there's nothing that should be changed about the TableSource interface to support it.
          • Row type does not have field names which make it problematic to access nested fields in nested Rows, but I believe this should be fixed in FLINK-5348.

          Therefore it seems that the only thing that should be done (except waiting for FLINK-5348 to be implemented) is to update TableSourceTable to use POJO fields in a correct order. Currently, it just generates indexes 0 to n:

          class TableSourceTable(val tableSource: TableSource[_])
            extends FlinkTable[Row](
              typeInfo = new RowTypeInfo(tableSource.getFieldTypes),
              fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
              fieldNames = tableSource.getFieldsNames)
          

          while it should use PojoTypeInfo.getFieldIndex() method to build a proper list of fields indexes.

          Am I missing something? Are there are some TableSource limitations that I am missing?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Fabian, Thank you for your reply. At first a question about your comment. In case of a Specific Avro record, we would need an additional step to copy the first-level Pojo fields into a Row Does "Specific Avro" mean a regular POJO? Regarding the TableSource interface, I think I've lost track of what problem we are trying to solve here I see the following problems with the current interface: There is no explicit relationship between fields positions in a Row and order of fields in a POJO type. As you mentioned, we can get fields order via PojoTypeInfo.getFieldIndex() . Since TableSource has a method getReturnType that returns TypeInformation , there's nothing that should be changed about the TableSource interface to support it. Row type does not have field names which make it problematic to access nested fields in nested Rows, but I believe this should be fixed in FLINK-5348 . Therefore it seems that the only thing that should be done (except waiting for FLINK-5348 to be implemented) is to update TableSourceTable to use POJO fields in a correct order. Currently, it just generates indexes 0 to n: class TableSourceTable(val tableSource: TableSource[_]) extends FlinkTable[Row]( typeInfo = new RowTypeInfo(tableSource.getFieldTypes), fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, fieldNames = tableSource.getFieldsNames) while it should use PojoTypeInfo.getFieldIndex() method to build a proper list of fields indexes. Am I missing something? Are there are some TableSource limitations that I am missing?
          Hide
          fhueske Fabian Hueske added a comment -

          Our discussion has changed a bit how I see the problem that needs to be fixed.

          Lets start with the current interface of TableSource and its purpose:

          A TableSource defines the schema of a table which is internally produced as a DataSet (BatchTableSource) or DataStream (StreamTableSource). A TableSource is basically internally converting a DataSet or DataStream into a Table. TableEnvironments have explicit methods to convert a DataSet of DataStream into a Table. There are two variants: 1) Without providing field names (example: java.BatchTableEnvironment.fromDataSet(DataSet dataset)). In this case the names are extracted from the TypeInformation of the DataSet or DataStream ("f0", "f1", ... for Tuple, the field names of a Pojo or case class, etc). 2) With parameters to specify the first level field names (example: java.BatchTableEnvironment.fromDataSet(DataSet dataset, String fields)). In case of a DataSet<Pojo>, the fields cannot be named by position but each existing field must be renamed to have a clear mapping.

          The current interface has mainly three methods to describe the schema of the returned table:

          • TypeInformation[] getReturnType() returns the type of the DataSet or DataStream produced by the TableSource.
          • String[] getFieldNames() returns the names of the first level of fields. The names are similar to the field names specified when converting a DataSet or DataStream into a Table.
          • TypeInformation[] getFieldTypes() returns the types of the first level of fields as Flink FlinkTypeInfomation. These types are actually included in the ReturnType which is usually a CompositeType.

          I see the following issues with the current interface:

          • getFieldTypes() is kind of redundant with getReturnType()
          • getFieldNames and getFieldTypes must match with the order of fields specified by the TypeInfo returned by getReturnType(). This is esp. tricky for Pojos where the order of fields is not explicit.
          • getFieldNames does only allow to specify the names of the first level (this is the same as for fromDataset() or fromDataStream(). Hence, it is not possible to rename nested fields.

          By following Jark Wu's proposal and only using getReturnType() we would address these issues as follows:

          • getFieldTypes() and getFieldNames() would be removed and not be redundant and implicitly tied to getReturnType(). getNumberOfFields() would be removed as well.
          • We would extract the Table schema only from the TypeInformation returned by getReturnType(). We can use existing logic for that (see fromDataSet(), fromDataStream()). Hence the behavior would be consistent with other parts of the API which is very good, IMO.
          • By extracting the schema from the return type, we cannot override the field names for types with fixed field names (Tuple, Pojo, CaseClass). If a TableSource returns a Tuple, the fields will be named "f0", "f1", .... If the TableSource returns a Pojo or CaseClass, the fields will be named like the fields in the Pojo.
          • By fixing FLINK-5348 and extending RowTypeInfo to support custom field names, TableSources can use this type if they need to define custom field names. The returned type must be a Row then.
          • Since we do not allow to rename fields, we do not have to care about mapping names to fields (no problem with the field order of Pojos). The order of fields is not important for the Table API and SQL.

          To summarize, I think Jark Wu's proposal is very good and is the way to go. It would make the TableSource interface better because

          • it is consistent with other parts of the API
          • it makes the interface leaner
          • together with FLINK-5348, we can do everything that we need.

          A specific Avro record is a Pojo which was code generated from an Avro schema. Flink can handle these generated classes as regular Pojos using the PojoTypeInfo.

          Show
          fhueske Fabian Hueske added a comment - Our discussion has changed a bit how I see the problem that needs to be fixed. Lets start with the current interface of TableSource and its purpose: A TableSource defines the schema of a table which is internally produced as a DataSet ( BatchTableSource ) or DataStream ( StreamTableSource ). A TableSource is basically internally converting a DataSet or DataStream into a Table. TableEnvironments have explicit methods to convert a DataSet of DataStream into a Table. There are two variants: 1) Without providing field names (example: java.BatchTableEnvironment.fromDataSet(DataSet dataset) ). In this case the names are extracted from the TypeInformation of the DataSet or DataStream ("f0", "f1", ... for Tuple, the field names of a Pojo or case class, etc). 2) With parameters to specify the first level field names (example: java.BatchTableEnvironment.fromDataSet(DataSet dataset, String fields) ). In case of a DataSet<Pojo> , the fields cannot be named by position but each existing field must be renamed to have a clear mapping. The current interface has mainly three methods to describe the schema of the returned table: TypeInformation[] getReturnType() returns the type of the DataSet or DataStream produced by the TableSource . String[] getFieldNames() returns the names of the first level of fields. The names are similar to the field names specified when converting a DataSet or DataStream into a Table. TypeInformation[] getFieldTypes() returns the types of the first level of fields as Flink FlinkTypeInfomation . These types are actually included in the ReturnType which is usually a CompositeType . I see the following issues with the current interface: getFieldTypes() is kind of redundant with getReturnType() getFieldNames and getFieldTypes must match with the order of fields specified by the TypeInfo returned by getReturnType() . This is esp. tricky for Pojos where the order of fields is not explicit. getFieldNames does only allow to specify the names of the first level (this is the same as for fromDataset() or fromDataStream() . Hence, it is not possible to rename nested fields. By following Jark Wu 's proposal and only using getReturnType() we would address these issues as follows: getFieldTypes() and getFieldNames() would be removed and not be redundant and implicitly tied to getReturnType() . getNumberOfFields() would be removed as well. We would extract the Table schema only from the TypeInformation returned by getReturnType() . We can use existing logic for that (see fromDataSet() , fromDataStream() ). Hence the behavior would be consistent with other parts of the API which is very good, IMO. By extracting the schema from the return type, we cannot override the field names for types with fixed field names (Tuple, Pojo, CaseClass). If a TableSource returns a Tuple, the fields will be named "f0", "f1", .... If the TableSource returns a Pojo or CaseClass, the fields will be named like the fields in the Pojo. By fixing FLINK-5348 and extending RowTypeInfo to support custom field names, TableSources can use this type if they need to define custom field names. The returned type must be a Row then. Since we do not allow to rename fields, we do not have to care about mapping names to fields (no problem with the field order of Pojos). The order of fields is not important for the Table API and SQL. To summarize, I think Jark Wu 's proposal is very good and is the way to go. It would make the TableSource interface better because it is consistent with other parts of the API it makes the interface leaner together with FLINK-5348 , we can do everything that we need. — A specific Avro record is a Pojo which was code generated from an Avro schema. Flink can handle these generated classes as regular Pojos using the PojoTypeInfo .
          Hide
          jark Jark Wu added a comment -

          Thank you Fabian Hueske for summarizing this, make sense to me

          Show
          jark Jark Wu added a comment - Thank you Fabian Hueske for summarizing this, make sense to me
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Fabian Hueske

          Thank you for the great and detailed proposal. It all makes sense.

          One question that came to my mind while I was working on it. Would it be better to leave all current methods in the TableSource interface and simply implement them using the return type. Something like:

          trait TableSource[T] {
          
            /** Returns the number of fields of the table. */
            final def getNumberOfFields: Int = {
              getReturnType.getTotalFields
            }
          
            /** Returns the names of the table fields. */
            final def getFieldsNames: Array[String] = {
                getReturnType match {
                  case c: CompositeType[T] => c.getFieldNames
                }
            }
          
            /** Returns the types of the table fields. */
            final def getFieldTypes: Array[TypeInformation[_]] = {
              getReturnType match {
                case c: CompositeType[T] => 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
              }
            }
          
            /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
            def getReturnType: TypeInformation[T]
          
          }
          

          I think there are two benefits:

          • No need to change existing code
          • We can put all the utility methods for TableSource in TableSource trait

          Would do you think about this?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Fabian Hueske Thank you for the great and detailed proposal. It all makes sense. One question that came to my mind while I was working on it. Would it be better to leave all current methods in the TableSource interface and simply implement them using the return type. Something like: trait TableSource[T] { /** Returns the number of fields of the table. */ final def getNumberOfFields: Int = { getReturnType.getTotalFields } /** Returns the names of the table fields. */ final def getFieldsNames: Array[ String ] = { getReturnType match { case c: CompositeType[T] => c.getFieldNames } } /** Returns the types of the table fields. */ final def getFieldTypes: Array[TypeInformation[_]] = { getReturnType match { case c: CompositeType[T] => 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray } } /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] } I think there are two benefits: No need to change existing code We can put all the utility methods for TableSource in TableSource trait Would do you think about this?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi Ivan Mushketyk,

          That's an interesting idea!

          I think getFieldTypes() and getNumberOfFields() are truly redundant and might even cause problems if they are not consistent with getReturnType(). We could make them final but that would change the API as well, so we can also remove them. IMO, it makes sense to break the API here. Its not declared stable and I don't think it is widely used.

          The benefit of keeping getFieldNames() would be that users could still overwrite the names of the TypeInformation by overriding the method. However, if we do that we would need to add a getFieldIndicies() method as well to map names to positions for proper POJO support. The question is whether it is worth to keep getFieldNames and add getFieldIndicies. I think is make senses to have these methods. Would be aligned with the BatchTableEnvironment.fromDataSet() methods.

          We could have default implementations for getFieldNames() and getFieldIndicies() that return null and use TableEnvironment.getFieldInfo(TypeInformation) or the explicitly provided information if the methods are overridden. That would allow us to reuse existing code instead of duplicating it.

          What do you think Ivan Mushketyk and Jark Wu?

          Show
          fhueske Fabian Hueske added a comment - Hi Ivan Mushketyk , That's an interesting idea! I think getFieldTypes() and getNumberOfFields() are truly redundant and might even cause problems if they are not consistent with getReturnType() . We could make them final but that would change the API as well, so we can also remove them. IMO, it makes sense to break the API here. Its not declared stable and I don't think it is widely used. The benefit of keeping getFieldNames() would be that users could still overwrite the names of the TypeInformation by overriding the method. However, if we do that we would need to add a getFieldIndicies() method as well to map names to positions for proper POJO support. The question is whether it is worth to keep getFieldNames and add getFieldIndicies . I think is make senses to have these methods. Would be aligned with the BatchTableEnvironment.fromDataSet() methods. We could have default implementations for getFieldNames() and getFieldIndicies() that return null and use TableEnvironment.getFieldInfo(TypeInformation) or the explicitly provided information if the methods are overridden. That would allow us to reuse existing code instead of duplicating it. What do you think Ivan Mushketyk and Jark Wu ?
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Fabian Hueske

          We could have default implementations for getFieldNames() and getFieldIndicies() that return null and use TableEnvironment.getFieldInfo(TypeInformation) or the explicitly provided information if the methods are overridden. That would allow us to reuse existing code instead of duplicating it.

          I think this is not very object-oriented way of doing it. IMO it would be better to provide a default implementations of getFieldNames() and getFieldIndicies(), but let a user to override these methods. This seems to be a cleaner approach.

          I think getFieldTypes() and getNumberOfFields() are truly redundant and might even cause problems if they are not consistent with getReturnType().

          These methods are used in current flink-table code base and we need to implement them somewhere. So the question is where to put them. We can put them in a Util class of some sort to avoid duplication or we can put them into TableSource class and make them final to ensure that they are consistent with getFieldNames() and getFieldIndicies() and type information. I am not married to the idea of keeping them in TableSource but it seems to be a bit cleaner to keep them in a Util class.

          So I would propose some combination of my previous idea and Fabian's idea:

          • Add getFieldIndicies method
          • Add default implementations for getFieldNames add getFieldIndicies based on getReturnType(), but make them overridable
          • Add default implementations for getFieldTypes() and getNumberOfFields(), but make them final
          • Leave getReturnType() unimplemented in trait. This will be the only method a user need to implement in the simplest case

          What do you think Fabian Hueske, Jark Wu

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Fabian Hueske We could have default implementations for getFieldNames() and getFieldIndicies() that return null and use TableEnvironment.getFieldInfo(TypeInformation) or the explicitly provided information if the methods are overridden. That would allow us to reuse existing code instead of duplicating it. I think this is not very object-oriented way of doing it. IMO it would be better to provide a default implementations of getFieldNames() and getFieldIndicies() , but let a user to override these methods. This seems to be a cleaner approach. I think getFieldTypes() and getNumberOfFields() are truly redundant and might even cause problems if they are not consistent with getReturnType(). These methods are used in current flink-table code base and we need to implement them somewhere. So the question is where to put them. We can put them in a Util class of some sort to avoid duplication or we can put them into TableSource class and make them final to ensure that they are consistent with getFieldNames() and getFieldIndicies() and type information. I am not married to the idea of keeping them in TableSource but it seems to be a bit cleaner to keep them in a Util class. So I would propose some combination of my previous idea and Fabian's idea: Add getFieldIndicies method Add default implementations for getFieldNames add getFieldIndicies based on getReturnType() , but make them overridable Add default implementations for getFieldTypes() and getNumberOfFields() , but make them final Leave getReturnType() unimplemented in trait. This will be the only method a user need to implement in the simplest case What do you think Fabian Hueske , Jark Wu
          Hide
          fhueske Fabian Hueske added a comment -

          How about the following:

          • We make the TableEnvironment.getFieldInfo() methods statically accessible, i.e., we move them in the TableEnvironment companion object and call this method for the default implementation of TableSource.getFieldNames() and TableSource.getFieldIndicies(). This way we avoid code duplication.
          • We remove getFieldTypes() and getNumberOfFields(). These methods are in fact not necessary for the interface. Have a look at how a regular DataSet is converted into a Table. The field types are extracted in FlinkTable. TableSourceTable overrides this logic. We could simply keep the FlinkTable logic.
          Show
          fhueske Fabian Hueske added a comment - How about the following: We make the TableEnvironment.getFieldInfo() methods statically accessible, i.e., we move them in the TableEnvironment companion object and call this method for the default implementation of TableSource.getFieldNames() and TableSource.getFieldIndicies() . This way we avoid code duplication. We remove getFieldTypes() and getNumberOfFields() . These methods are in fact not necessary for the interface. Have a look at how a regular DataSet is converted into a Table. The field types are extracted in FlinkTable . TableSourceTable overrides this logic. We could simply keep the FlinkTable logic.
          Hide
          jark Jark Wu added a comment - - edited

          Hi guys,

          It is a very good extension of the discussion. It seems that I'm late for the discussion . I will still post my ideas below.

          I think getFieldTypes() and getNumberOfFields() can be derived from getReturnType() all the time. So I would like to move them into util class, it will make the interface clean. Actually, we already have one called UserDefinedFunctionUtil.getFieldInfo(TypeInformation) and TableEnvironment.getFieldInfo(TypeInformation) which returns field types and names and indicies. We can refactor them and move them to a better place and maybe add split variant (i.e. getFieldNames, getFieldIndicies, getFieldTypes).

          And provide the default implementation of TableSource.getFieldNames and TableSource.getFieldIndicies based on the util.

          Show
          jark Jark Wu added a comment - - edited Hi guys, It is a very good extension of the discussion. It seems that I'm late for the discussion . I will still post my ideas below. I think getFieldTypes() and getNumberOfFields() can be derived from getReturnType() all the time. So I would like to move them into util class, it will make the interface clean. Actually, we already have one called UserDefinedFunctionUtil.getFieldInfo(TypeInformation) and TableEnvironment.getFieldInfo(TypeInformation) which returns field types and names and indicies. We can refactor them and move them to a better place and maybe add split variant (i.e. getFieldNames , getFieldIndicies , getFieldTypes ). And provide the default implementation of TableSource.getFieldNames and TableSource.getFieldIndicies based on the util.
          Hide
          fhueske Fabian Hueske added a comment -

          It seems that these util methods have been implemented a couple of times.

          As I said, there is also TableEnvironment.getFieldInto() for names and indicies and FlinkTable for the types. Would be good to have all of that in one place.

          Show
          fhueske Fabian Hueske added a comment - It seems that these util methods have been implemented a couple of times. As I said, there is also TableEnvironment.getFieldInto() for names and indicies and FlinkTable for the types. Would be good to have all of that in one place.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi guys,

          I like your idea, Fabian, I'll start working on it and see how it goes.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi guys, I like your idea, Fabian, I'll start working on it and see how it goes.
          Hide
          fhueske Fabian Hueske added a comment -

          Great, I just merged FLINK-5348

          Show
          fhueske Fabian Hueske added a comment - Great, I just merged FLINK-5348
          Hide
          fhueske Fabian Hueske added a comment -

          Maybe we can squeeze this change still into the 1.2.0 release.
          Would be good since we are changing the interface of TableSource.

          Show
          fhueske Fabian Hueske added a comment - Maybe we can squeeze this change still into the 1.2.0 release. Would be good since we are changing the interface of TableSource .
          Hide
          jark Jark Wu added a comment -

          Sounds great, +1 go into 1.2 if we can catch up.

          Show
          jark Jark Wu added a comment - Sounds great, +1 go into 1.2 if we can catch up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user mushketyk opened a pull request:

          https://github.com/apache/flink/pull/3039

          FLINK-5280 Update TableSource to support nested data

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/mushketyk/flink nested-table-source

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3039.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3039


          commit 7bd26239dd9a7c41f09fbe070baa70b19278c51f
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-12-22T21:26:34Z

          FLINK-5280 Update TableSource to support nested data


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/3039 FLINK-5280 Update TableSource to support nested data Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink nested-table-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3039 commit 7bd26239dd9a7c41f09fbe070baa70b19278c51f Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-12-22T21:26:34Z FLINK-5280 Update TableSource to support nested data
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93755631

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          + */
          + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          + validateType(inputType)
          +
          + inputType match {
          + case t: TupleTypeInfo[_] => getTypes(t)
          + case c: CaseClassTypeInfo[_] => getTypes(c)
          + case p: PojoTypeInfo[_] => getTypes(p)
          + case r: RowTypeInfo => getTypes(r)
          — End diff –

          What about use `case c: CompositeType => c.getFieldNames` instead of case every type?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93755631 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo [_] => getTypes(t) + case c: CaseClassTypeInfo [_] => getTypes(c) + case p: PojoTypeInfo [_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) — End diff – What about use `case c: CompositeType => c.getFieldNames` instead of case every type?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93758640

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala —
          @@ -44,14 +44,14 @@ abstract class FlinkTable[T](

          val fieldTypes: Array[TypeInformation[_]] =
          — End diff –

          We can use the util `getFieldTypes` to generate the TypeInformation array to reduce redundant code. And we can put the length check above.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758640 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala — @@ -44,14 +44,14 @@ abstract class FlinkTable [T] ( val fieldTypes: Array[TypeInformation [_] ] = — End diff – We can use the util `getFieldTypes` to generate the TypeInformation array to reduce redundant code. And we can put the length check above.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93756907

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          — End diff –

          `TableSource.getReturnType` -> `TableSource#getReturnType`, use `#` instead of `.` in javadoc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756907 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case — End diff – `TableSource.getReturnType` -> `TableSource#getReturnType`, use `#` instead of `.` in javadoc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93756028

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          — End diff –

          Could you add comment to describe what does this method do ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756028 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { — End diff – Could you add comment to describe what does this method do ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93757413

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          + */
          + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          + validateType(inputType)
          +
          + inputType match

          { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + }
          +
          + private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
          + 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
          — End diff –

          `c.getTypeAt(_)` can be simplified to `c.getTypeAt` .

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757413 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + } + + private def getTypes(c: CompositeType [_] ): Array[TypeInformation [_] ] = { + 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray — End diff – `c.getTypeAt(_)` can be simplified to `c.getTypeAt` .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93758724

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala —
          @@ -25,6 +25,6 @@ import org.apache.flink.types.Row
          /** Table which defines an external table via a [[TableSource]] */
          — End diff –

          unused import `org.apache.flink.api.java.typeutils.RowTypeInfo`

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758724 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala — @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [ [TableSource] ] */ — End diff – unused import `org.apache.flink.api.java.typeutils.RowTypeInfo`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93756487

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          + * field names and field indices are deducted from the returned type.
          + *
          + * In case if custom field names are required one need to implement both
          + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
          *

          • @tparam T The return type of the [[TableSource]].
            */
            trait TableSource[T] {
          • /** Returns the number of fields of the table. */
          • def getNumberOfFields: Int
            -
            /** Returns the names of the table fields. */
          • def getFieldsNames: Array[String]
            -
          • /** Returns the types of the table fields. */
          • def getFieldTypes: Array[TypeInformation[_]]
            + def getFieldsNames: Array[String] = { + TableEnvironment.getFieldInfo(getReturnType)._1 + }

            +
            + /** Returns the indices of the table fields. */
            + def getFieldsIndices: Array[Int] = {
            + getFieldsNames.indices.toArray

              • End diff –

          I think this could be simplified to `getReturnType.getArity.indices.toArray`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756487 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [ [TableSource.getFieldsNames] ] and [ [TableSource.getFieldsIndices] ]. * @tparam T The return type of the [ [TableSource] ]. */ trait TableSource [T] { /** Returns the number of fields of the table. */ def getNumberOfFields: Int - /** Returns the names of the table fields. */ def getFieldsNames: Array [String] - /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation [_] ] + def getFieldsNames: Array [String] = { + TableEnvironment.getFieldInfo(getReturnType)._1 + } + + /** Returns the indices of the table fields. */ + def getFieldsIndices: Array [Int] = { + getFieldsNames.indices.toArray End diff – I think this could be simplified to `getReturnType.getArity.indices.toArray`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93758297

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          + */
          + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          + validateType(inputType)
          +
          + inputType match {
          + case t: TupleTypeInfo[_] => getTypes(t)
          + case c: CaseClassTypeInfo[_] => getTypes(c)
          + case p: PojoTypeInfo[_] => getTypes(p)
          + case r: RowTypeInfo => getTypes(r)
          + case tpe =>
          — End diff –

          Same here, should support `AtomicType`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758297 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo [_] => getTypes(t) + case c: CaseClassTypeInfo [_] => getTypes(c) + case p: PojoTypeInfo [_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => — End diff – Same here, should support `AtomicType`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93757097

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          Why do we do like this ? It seems that we do not need to override this method ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757097 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – Why do we do like this ? It seems that we do not need to override this method ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93757105

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); }
          • @Override
          • public TypeInformation<?>[] getFieldTypes() {
          • return fieldTypes;
            + public int[] getFieldsIndices() {
            + return TableSource$class.getFieldsIndices(this);
              • End diff –

          Why do we do like this ? It seems that we do not need to override this method ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757105 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; + public int[] getFieldsIndices() { + return TableSource$class.getFieldsIndices(this); End diff – Why do we do like this ? It seems that we do not need to override this method ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93756642

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          + * field names and field indices are deducted from the returned type.
          — End diff –

          deducted -> derived ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756642 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case + * field names and field indices are deducted from the returned type. — End diff – deducted -> derived ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93757999

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) {

          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
            protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
          • (Array[String], Array[Int]) =
          • {
          • validateType(inputType)
            -
          • val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - }
          • val fieldIndexes = fieldNames.indices.toArray
            -
          • if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - }

            -

          • (fieldNames, fieldIndexes)
            + (Array[String], Array[Int]) = {
            + TableEnvironment.getFieldInfo(inputType)
              • End diff –

          It seems that this is redundant, we can remove this and use the util `getFieldInfo` instead of calling this method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757999 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) { @return A tuple of two arrays holding the field names and corresponding field positions. */ protected [flink] def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { validateType(inputType) - val fieldNames: Array [String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - } val fieldIndexes = fieldNames.indices.toArray - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - (fieldNames, fieldIndexes) + (Array [String] , Array [Int] ) = { + TableEnvironment.getFieldInfo(inputType) End diff – It seems that this is redundant, we can remove this and use the util `getFieldInfo` instead of calling this method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93758185

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match {
          + case t: TupleTypeInfo[A] => t.getFieldNames
          + case c: CaseClassTypeInfo[A] => c.getFieldNames
          + case p: PojoTypeInfo[A] => p.getFieldNames
          + case r: RowTypeInfo => r.getFieldNames
          + case tpe =>
          — End diff –

          We should extend here to support `AtomicType`. See `UserDefinedFunctionUtil.getFieldInfo(TypeInformation)`. And I would like to combine these methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93758185 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo [A] => t.getFieldNames + case c: CaseClassTypeInfo [A] => c.getFieldNames + case p: PojoTypeInfo [A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => — End diff – We should extend here to support `AtomicType`. See `UserDefinedFunctionUtil.getFieldInfo(TypeInformation)`. And I would like to combine these methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93759808

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala —
          @@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException
          import org.apache.flink.table.calcite.FlinkTypeFactory

          abstract class FlinkTable[T](

          • val typeInfo: TypeInformation[T],
            + val typeInfo: TypeInformation[_],
              • End diff –

          This can be reverted when you make `TableSourceTable` supporting generic type.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759808 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala — @@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable [T] ( val typeInfo: TypeInformation [T] , + val typeInfo: TypeInformation [_] , End diff – This can be reverted when you make `TableSourceTable` supporting generic type.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93756954

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          + * field names and field indices are deducted from the returned type.
          + *
          + * In case if custom field names are required one need to implement both
          + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
          — End diff –

          `TableSource.getFieldsNames` => `TableSource#getFieldsNames`
          `TableSource.getFieldsIndices` => `TableSource#getFieldsIndices`

          use `#` instead of `.` in javadoc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93756954 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [ [TableSource.getFieldsNames] ] and [ [TableSource.getFieldsIndices] ]. — End diff – `TableSource.getFieldsNames` => `TableSource#getFieldsNames` `TableSource.getFieldsIndices` => `TableSource#getFieldsIndices` use `#` instead of `.` in javadoc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93759469

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          — End diff –

          I think we should not restrict the input type is a `CompositeType`, because the `TableSource.getReturnType` could return any type including `AtomicType`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759469 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. — End diff – I think we should not restrict the input type is a `CompositeType`, because the `TableSource.getReturnType` could return any type including `AtomicType`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93759055

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala —
          @@ -25,6 +25,6 @@ import org.apache.flink.types.Row
          /** Table which defines an external table via a [[TableSource]] */
          class TableSourceTable(val tableSource: TableSource[_])
          extends FlinkTable[Row](
          — End diff –

          I think it's not necessary to declare the return type is `Row`. `TableSource` could return Pojo in the future.

          So it should be declared like `DataSetTable` and `DataStreamTable`, such as:

          ```scala
          class TableSourceTable[T](val tableSource: TableSource[T])
          extends FlinkTable[T](
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93759055 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala — @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [ [TableSource] ] */ class TableSourceTable(val tableSource: TableSource [_] ) extends FlinkTable [Row] ( — End diff – I think it's not necessary to declare the return type is `Row`. `TableSource` could return Pojo in the future. So it should be declared like `DataSetTable` and `DataStreamTable`, such as: ```scala class TableSourceTable [T] (val tableSource: TableSource [T] ) extends FlinkTable [T] ( ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93757791

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala —
          @@ -38,7 +38,9 @@ class BatchTableSourceScan(

          override def deriveRowType() = {
          val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]

          • flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
            + flinkTypeFactory.buildRowDataType(
            + tableSource.getFieldsNames,
            + TableEnvironment.getFieldTypes(tableSource.getReturnType))
              • End diff –

          What if the table source return an AtomicType ? It seems that it will fail here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93757791 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala — @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf [FlinkTypeFactory] flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) End diff – What if the table source return an AtomicType ? It seems that it will fail here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93755496

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match {
          + case t: TupleTypeInfo[A] => t.getFieldNames
          + case c: CaseClassTypeInfo[A] => c.getFieldNames
          + case p: PojoTypeInfo[A] => p.getFieldNames
          + case r: RowTypeInfo => r.getFieldNames
          — End diff –

          what about use `case c: CompositeType => c.getFieldNames` instead of case every type?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93755496 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo [A] => t.getFieldNames + case c: CaseClassTypeInfo [A] => c.getFieldNames + case p: PojoTypeInfo [A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames — End diff – what about use `case c: CompositeType => c.getFieldNames` instead of case every type?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761407

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match {
          + case t: TupleTypeInfo[A] => t.getFieldNames
          + case c: CaseClassTypeInfo[A] => c.getFieldNames
          + case p: PojoTypeInfo[A] => p.getFieldNames
          + case r: RowTypeInfo => r.getFieldNames
          — End diff –

          Good point, will do that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761407 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo [A] => t.getFieldNames + case c: CaseClassTypeInfo [A] => c.getFieldNames + case p: PojoTypeInfo [A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames — End diff – Good point, will do that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761442

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          — End diff –

          Sure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761442 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { — End diff – Sure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761548

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          — End diff –

          Ok, this makes sense.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761548 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. — End diff – Ok, this makes sense.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761556

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          + */
          + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          + validateType(inputType)
          +
          + inputType match {
          + case t: TupleTypeInfo[_] => getTypes(t)
          + case c: CaseClassTypeInfo[_] => getTypes(c)
          + case p: PojoTypeInfo[_] => getTypes(p)
          + case r: RowTypeInfo => getTypes(r)
          — End diff –

          Ok, will update.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761556 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo [_] => getTypes(t) + case c: CaseClassTypeInfo [_] => getTypes(c) + case p: PojoTypeInfo [_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) — End diff – Ok, will update.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761574

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          + */
          + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          + validateType(inputType)
          +
          + inputType match

          { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + }
          +
          + private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
          + 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
          — End diff –

          Good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761574 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. + */ + def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { + validateType(inputType) + + inputType match { + case t: TupleTypeInfo[_] => getTypes(t) + case c: CaseClassTypeInfo[_] => getTypes(c) + case p: PojoTypeInfo[_] => getTypes(p) + case r: RowTypeInfo => getTypes(r) + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + } + + private def getTypes(c: CompositeType [_] ): Array[TypeInformation [_] ] = { + 0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray — End diff – Good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761791

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          + * field names and field indices are deducted from the returned type.
          — End diff –

          Good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761791 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case + * field names and field indices are deducted from the returned type. — End diff – Good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761823

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          — End diff –

          Aren't we using scaladoc here? I thought it's different in scaladoc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761823 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case — End diff – Aren't we using scaladoc here? I thought it's different in scaladoc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761841

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          + * field names and field indices are deducted from the returned type.
          + *
          + * In case if custom field names are required one need to implement both
          + * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
          *

          • @tparam T The return type of the [[TableSource]].
            */
            trait TableSource[T] {
          • /** Returns the number of fields of the table. */
          • def getNumberOfFields: Int
            -
            /** Returns the names of the table fields. */
          • def getFieldsNames: Array[String]
            -
          • /** Returns the types of the table fields. */
          • def getFieldTypes: Array[TypeInformation[_]]
            + def getFieldsNames: Array[String] = { + TableEnvironment.getFieldInfo(getReturnType)._1 + }

            +
            + /** Returns the indices of the table fields. */
            + def getFieldsIndices: Array[Int] = {
            + getFieldsNames.indices.toArray

              • End diff –

          Ok, good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761841 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case + * field names and field indices are deducted from the returned type. + * + * In case if custom field names are required one need to implement both + * [ [TableSource.getFieldsNames] ] and [ [TableSource.getFieldsIndices] ]. * @tparam T The return type of the [ [TableSource] ]. */ trait TableSource [T] { /** Returns the number of fields of the table. */ def getNumberOfFields: Int - /** Returns the names of the table fields. */ def getFieldsNames: Array [String] - /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation [_] ] + def getFieldsNames: Array [String] = { + TableEnvironment.getFieldInfo(getReturnType)._1 + } + + /** Returns the indices of the table fields. */ + def getFieldsIndices: Array [Int] = { + getFieldsNames.indices.toArray End diff – Ok, good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761979

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          We do not override this method. As far as I understand we cannot inherit a method from a Scala trait if this trait has implementation: http://stackoverflow.com/a/7637888

          Am I missing something?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761979 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – We do not override this method. As far as I understand we cannot inherit a method from a Scala trait if this trait has implementation: http://stackoverflow.com/a/7637888 Am I missing something?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93761987

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); }
          • @Override
          • public TypeInformation<?>[] getFieldTypes() {
          • return fieldTypes;
            + public int[] getFieldsIndices() {
            + return TableSource$class.getFieldsIndices(this);
              • End diff –

          Ditto.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93761987 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; + public int[] getFieldsIndices() { + return TableSource$class.getFieldsIndices(this); End diff – Ditto.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93762016

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala —
          @@ -25,6 +25,6 @@ import org.apache.flink.types.Row
          /** Table which defines an external table via a [[TableSource]] */
          — End diff –

          Good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762016 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala — @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [ [TableSource] ] */ — End diff – Good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93762127

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala —
          @@ -38,7 +38,9 @@ class BatchTableSourceScan(

          override def deriveRowType() = {
          val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]

          • flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
            + flinkTypeFactory.buildRowDataType(
            + tableSource.getFieldsNames,
            + TableEnvironment.getFieldTypes(tableSource.getReturnType))
              • End diff –

          Should be fine if we add support for `AtomicType` in `TableEnvironment.getFieldTypes`

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762127 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala — @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf [FlinkTypeFactory] flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + tableSource.getFieldsNames, + TableEnvironment.getFieldTypes(tableSource.getReturnType)) End diff – Should be fine if we add support for `AtomicType` in `TableEnvironment.getFieldTypes`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93762178

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala —
          @@ -44,14 +44,14 @@ abstract class FlinkTable[T](

          val fieldTypes: Array[TypeInformation[_]] =
          — End diff –

          Ok, good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762178 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala — @@ -44,14 +44,14 @@ abstract class FlinkTable [T] ( val fieldTypes: Array[TypeInformation [_] ] = — End diff – Ok, good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93762225

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala —
          @@ -25,6 +25,6 @@ import org.apache.flink.types.Row
          /** Table which defines an external table via a [[TableSource]] */
          class TableSourceTable(val tableSource: TableSource[_])
          extends FlinkTable[Row](
          — End diff –

          Ok, good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93762225 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala — @@ -25,6 +25,6 @@ import org.apache.flink.types.Row /** Table which defines an external table via a [ [TableSource] ] */ class TableSourceTable(val tableSource: TableSource [_] ) extends FlinkTable [Row] ( — End diff – Ok, good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @wuchong

          Thank you for review. I'll try to update the PR today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong Thank you for review. I'll try to update the PR today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93763491

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          — End diff –

          I'm not sure about that. My IDEA highlight `[[TableSource.getReturnType]]` error but `[TableSource#getReturnType]` is fine.

          IDEA 2016.3.1, Scala plugin 2016.3.5

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93763491 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case — End diff – I'm not sure about that. My IDEA highlight `[ [TableSource.getReturnType] ]` error but `[ TableSource#getReturnType ]` is fine. IDEA 2016.3.1, Scala plugin 2016.3.5
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93763884

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          What about to make `TableSource` abstract class? So that it can fit in with Java and Scala without involving something hack.

          In this way, the `StreamTableSource` and `BatchTableSource` should be abstract class too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93763884 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – What about to make `TableSource` abstract class? So that it can fit in with Java and Scala without involving something hack. In this way, the `StreamTableSource` and `BatchTableSource` should be abstract class too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93764349

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          `CsvTableSource` is inheriting both `StreamTableSource` and `BatchTableSource` so they should be traits.

          I don't think that adding a method that only calling implementation from a trait is a big issue. In any case we do not duplicate the code and do not re-implement methods.

          Do you have any concerns about this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93764349 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – `CsvTableSource` is inheriting both `StreamTableSource` and `BatchTableSource` so they should be traits. I don't think that adding a method that only calling implementation from a trait is a big issue. In any case we do not duplicate the code and do not re-implement methods. Do you have any concerns about this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93780163

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          Make sense to me. It seems that we have to keep them as traits.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93780163 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – Make sense to me. It seems that we have to keep them as traits.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93797831

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) {

          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
            protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
          • (Array[String], Array[Int]) =
          • {
          • validateType(inputType)
            -
          • val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - }
          • val fieldIndexes = fieldNames.indices.toArray
            -
          • if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - }

            -

          • (fieldNames, fieldIndexes)
            + (Array[String], Array[Int]) = {
            + TableEnvironment.getFieldInfo(inputType)
              • End diff –

          It is overridden in a subclass, so I decided to leave this method here and only move the body out of it to make it reusable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93797831 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: TableConfig) { @return A tuple of two arrays holding the field names and corresponding field positions. */ protected [flink] def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { validateType(inputType) - val fieldNames: Array [String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - } val fieldIndexes = fieldNames.indices.toArray - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - (fieldNames, fieldIndexes) + (Array [String] , Array [Int] ) = { + TableEnvironment.getFieldInfo(inputType) End diff – It is overridden in a subclass, so I decided to leave this method here and only move the body out of it to make it reusable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @wuchong

          I've updated the PR according to your comments. Could you please review it again?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong I've updated the PR according to your comments. Could you please review it again?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93816520

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
          def getFieldInfo(inputType: TypeInformation[_])
          — End diff –

          Can we move this method into `TableEnvironment` object ? It's better to put the field info related methods in one place.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816520 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation [_] ) — End diff – Can we move this method into `TableEnvironment` object ? It's better to put the field info related methods in one place.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93816761

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -552,6 +556,10 @@ object TableEnvironment {
          }
          }

          + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
          — End diff –

          The plural of index should be indices. And please add a comment to describe what this method do.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816761 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -552,6 +556,10 @@ object TableEnvironment { } } + def getFieldIndexes(inputType: TypeInformation [_] ): Array [Int] = { — End diff – The plural of index should be indices. And please add a comment to describe what this method do.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818006

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +510,77 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          — End diff –

          The comment should be updated since AtomicType is also supported.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818006 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. — End diff – The comment should be updated since AtomicType is also supported.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93817908

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); }
          • @Override
          • public TypeInformation<?>[] getFieldTypes() {
          • return fieldTypes;
            + public int[] getFieldsIndexes() {
              • End diff –

          Would be better to comment why we do this. And `getFieldsIndexes` -> `getFieldsIndices`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93817908 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { - return fieldNames; + return TableSource$class.getFieldsNames(this); } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; + public int[] getFieldsIndexes() { End diff – Would be better to comment why we do this. And `getFieldsIndexes` -> `getFieldsIndices`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93816786

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -522,26 +523,29 @@ object TableEnvironment {

          • @tparam A The type of the TypeInformation.
          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
          • def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
            + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
              • End diff –

          The javadoc should be updated since the method changed

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816786 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -522,26 +523,29 @@ object TableEnvironment { @tparam A The type of the TypeInformation. @return A tuple of two arrays holding the field names and corresponding field positions. */ def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + def getFieldNames [A] (inputType: TypeInformation [A] ): Array [String] = { End diff – The javadoc should be updated since the method changed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818018

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +510,77 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          — End diff –

          Please update the javadoc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818018 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. — End diff – Please update the javadoc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93816695

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -26,24 +26,24 @@ import org.apache.flink.table.api.TableEnvironment

          • Schema information consists of a data type, field names, and corresponding indices of
          • these names in the data type.
            *
          • * To define a TableSource one need to implement [[TableSource.getReturnType]]. In this case
          • * field names and field indices are deducted from the returned type.
            + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
            + * field names and field indices are derived from the returned type.
            *
          • In case if custom field names are required one need to implement both
          • @tparam T The return type of the [[TableSource]].
            */
            trait TableSource[T] {

          /** Returns the names of the table fields. */
          def getFieldsNames: Array[String] =

          { - TableEnvironment.getFieldInfo(getReturnType)._1 + TableEnvironment.getFieldNames(getReturnType) }

          /** Returns the indices of the table fields. */

          • def getFieldsIndices: Array[Int] = {
          • getFieldsNames.indices.toArray
            + def getFieldsIndexes: Array[Int] = {
              • End diff –

          The plural of index should be indices.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816695 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -26,24 +26,24 @@ import org.apache.flink.table.api.TableEnvironment Schema information consists of a data type, field names, and corresponding indices of these names in the data type. * * To define a TableSource one need to implement [ [TableSource.getReturnType] ]. In this case * field names and field indices are deducted from the returned type. + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. * In case if custom field names are required one need to implement both * [ [TableSource.getFieldsNames] ] and [ [TableSource.getFieldsIndices] ]. + * [ TableSource#getFieldsNames ] and [ TableSource#getFieldsIndices ]. * @tparam T The return type of the [ [TableSource] ]. */ trait TableSource [T] { /** Returns the names of the table fields. */ def getFieldsNames: Array [String] = { - TableEnvironment.getFieldInfo(getReturnType)._1 + TableEnvironment.getFieldNames(getReturnType) } /** Returns the indices of the table fields. */ def getFieldsIndices: Array [Int] = { getFieldsNames.indices.toArray + def getFieldsIndexes: Array [Int] = { End diff – The plural of index should be indices.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93816451

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -566,17 +574,13 @@ object TableEnvironment {
          def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
          validateType(inputType)

          • inputType match {
          • case t: TupleTypeInfo[_] => getTypes(t)
          • case c: CaseClassTypeInfo[_] => getTypes(c)
          • case p: PojoTypeInfo[_] => getTypes(p)
          • case r: RowTypeInfo => getTypes(r)
          • case tpe =>
          • throw new TableException(s"Type $tpe lacks explicit field naming")
            + getFieldNames(inputType).map { i =>
              • End diff –

          I think this maybe error-prone. Field types array should be mapped by indices not field names. Such as PojoType filed names' order is not equal to field types' order. The original code in `UserDefinedFunctionUtil.getFieldInfo` maybe wrong too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93816451 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -566,17 +574,13 @@ object TableEnvironment { def getFieldTypes(inputType: TypeInformation [_] ): Array[TypeInformation [_] ] = { validateType(inputType) inputType match { case t: TupleTypeInfo [_] => getTypes(t) case c: CaseClassTypeInfo [_] => getTypes(c) case p: PojoTypeInfo [_] => getTypes(p) case r: RowTypeInfo => getTypes(r) case tpe => throw new TableException(s"Type $tpe lacks explicit field naming") + getFieldNames(inputType).map { i => End diff – I think this maybe error-prone. Field types array should be mapped by indices not field names. Such as PojoType filed names' order is not equal to field types' order. The original code in `UserDefinedFunctionUtil.getFieldInfo` maybe wrong too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818027

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +510,77 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + }

          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + fieldNames
          + }
          +
          + /**
          + * Validate if class represented by the typeInfo is static and globally accessible
          + * @param typeInfo type to check
          + * @throws TableException if type does not meet these criteria
          + */
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] =

          { + getFieldNames(inputType).indices.toArray + }

          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation to extract field types from.
          + * @return an holding the field types.
          — End diff –

          An array holding the field types.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818027 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldNames [A] (inputType: TypeInformation [A] ): Array [String] = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + def getFieldIndexes(inputType: TypeInformation [_] ): Array [Int] = { + getFieldNames(inputType).indices.toArray + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation to extract field types from. + * @return an holding the field types. — End diff – An array holding the field types.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818167

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,32 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
          + * field names and field indices are derived from the returned type.
          + *
          + * In case if custom field names are required one need to implement both
          + * [TableSource#getFieldsNames] and [TableSource#getFieldsIndices].
          *

          • @tparam T The return type of the [[TableSource]].
            */
            trait TableSource[T] {
          • /** Returns the number of fields of the table. */
          • def getNumberOfFields: Int
            -
            /** Returns the names of the table fields. */
          • def getFieldsNames: Array[String]
            -
          • /** Returns the types of the table fields. */
          • def getFieldTypes: Array[TypeInformation[_]]
            + def getFieldsNames: Array[String] = { + TableEnvironment.getFieldNames(getReturnType) + }

            +
            + /** Returns the indices of the table fields. */
            + def getFieldsIndexes: Array[Int] = {

              • End diff –

          The plural of index should be indices.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818167 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,32 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to implement both + * [ TableSource#getFieldsNames ] and [ TableSource#getFieldsIndices ]. * @tparam T The return type of the [ [TableSource] ]. */ trait TableSource [T] { /** Returns the number of fields of the table. */ def getNumberOfFields: Int - /** Returns the names of the table fields. */ def getFieldsNames: Array [String] - /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation [_] ] + def getFieldsNames: Array [String] = { + TableEnvironment.getFieldNames(getReturnType) + } + + /** Returns the indices of the table fields. */ + def getFieldsIndexes: Array [Int] = { End diff – The plural of index should be indices.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818063

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala —
          @@ -44,14 +44,14 @@ abstract class FlinkTable[T](

          val fieldTypes: Array[TypeInformation[_]] =
          typeInfo match {

          • case cType: CompositeType[T] =>
            + case cType: CompositeType[_] =>
              • End diff –

          The fieldTypes can be get from `TableEnvironment.getFieldTypes(typeInfo)` and the length validation could be placed above.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818063 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala — @@ -44,14 +44,14 @@ abstract class FlinkTable [T] ( val fieldTypes: Array[TypeInformation [_] ] = typeInfo match { case cType: CompositeType [T] => + case cType: CompositeType [_] => End diff – The fieldTypes can be get from `TableEnvironment.getFieldTypes(typeInfo)` and the length validation could be placed above.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818032

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +509,74 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + }

          + val fieldIndexes = fieldNames.indices.toArray
          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + (fieldNames, fieldIndexes)
          + }
          +
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          — End diff –

          The comment should be updated since AtomicType is also supported.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818032 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +509,74 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldInfo [A] (inputType: TypeInformation [A] ): (Array [String] , Array [Int] ) = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case r: RowTypeInfo => r.getFieldNames + case tpe => + throw new TableException(s"Type $tpe lacks explicit field naming") + } + val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + (fieldNames, fieldIndexes) + } + + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. — End diff – The comment should be updated since AtomicType is also supported.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93818113

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +510,77 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names and field positions for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]].
          + *
          + * @param inputType The TypeInformation extract the field names and positions from.
          + * @tparam A The type of the TypeInformation.
          + * @return A tuple of two arrays holding the field names and corresponding field positions.
          + */
          + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + }

          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + fieldNames
          + }
          +
          + /**
          + * Validate if class represented by the typeInfo is static and globally accessible
          + * @param typeInfo type to check
          + * @throws TableException if type does not meet these criteria
          + */
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
          — End diff –

          Would be better to add a comment above since it is a public method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93818113 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +510,77 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names and field positions for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ]. + * + * @param inputType The TypeInformation extract the field names and positions from. + * @tparam A The type of the TypeInformation. + * @return A tuple of two arrays holding the field names and corresponding field positions. + */ + def getFieldNames [A] (inputType: TypeInformation [A] ): Array [String] = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + def getFieldIndexes(inputType: TypeInformation [_] ): Array [Int] = { — End diff – Would be better to add a comment above since it is a public method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93817904

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          Would be better to comment why we do this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93817904 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – Would be better to comment why we do this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r93943868

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java —
          @@ -111,24 +112,17 @@
          return kafkaSource;
          }

          • @Override
          • public int getNumberOfFields() { - return fieldNames.length; - }

            -

          • @Override
            public String[] getFieldsNames() {
          • return fieldNames;
            + return TableSource$class.getFieldsNames(this);
              • End diff –

          Hi,

          I'm not sure about implementing this as a Scala trait with implemented methods. IMO, this makes it much harder to implement TableSources in Java (esp. for users who are not familiar with Scala and its implications).

          What do you think about implementing `TableSource` as abstract class and providing three other abstract classes that extend `TableSource` with the batch, the stream, and both interfaces?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r93943868 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java — @@ -111,24 +112,17 @@ return kafkaSource; } @Override public int getNumberOfFields() { - return fieldNames.length; - } - @Override public String[] getFieldsNames() { return fieldNames; + return TableSource$class.getFieldsNames(this); End diff – Hi, I'm not sure about implementing this as a Scala trait with implemented methods. IMO, this makes it much harder to implement TableSources in Java (esp. for users who are not familiar with Scala and its implications). What do you think about implementing `TableSource` as abstract class and providing three other abstract classes that extend `TableSource` with the batch, the stream, and both interfaces?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          Thanks for working on this @mushketyk and the reviews @wuchong.
          I just add a comment regarding the Scala trait with implemented function. I'll do a more thorough review in the next days.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Thanks for working on this @mushketyk and the reviews @wuchong. I just add a comment regarding the Scala trait with implemented function. I'll do a more thorough review in the next days. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @fhueske, @wuchong

          Thank you for your reviews.

          I think creating three abstract classes is a good idea, since we don't expect any new types of table sources, so there will not be a lot of combinations.

          I'll try to update the PR tomorrow according to all current comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong Thank you for your reviews. I think creating three abstract classes is a good idea, since we don't expect any new types of table sources, so there will not be a lot of combinations. I'll try to update the PR tomorrow according to all current comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94087339

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
          def getFieldInfo(inputType: TypeInformation[_])
          — End diff –

          `getFieldInfo` is overridden in `StreamTableEnvironment`, so I cannot make it a static method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94087339 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation [_] ) — End diff – `getFieldInfo` is overridden in `StreamTableEnvironment`, so I cannot make it a static method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94095173

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
          def getFieldInfo(inputType: TypeInformation[_])
          — End diff –

          I think we can refactor this. Provide a `getFieldInfo` static method, get the field names from `getFieldInfo`, and then do additional check outside. What do you think ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94095173 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation [_] ) — End diff – I think we can refactor this. Provide a `getFieldInfo` static method, get the field names from `getFieldInfo`, and then do additional check outside. What do you think ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @fhueske, @wuchong

          I've updated my PR according to your reviews.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I've updated my PR according to your reviews.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @mushketyk , thanks for the updating.

          Regarding to the `TableSource` , currently, there are 8 `TableSource` interfaces provided to users including trait and abstract class. It is a little hard to choose which one to use to implement a custom table source. What about to implement `TableSource` as abstract class and provide default implementation for `getFieldNames()` and `getFieldsIndices()`. And provide three other abstract class : `BatchTableSource` (with `getDataSet` interface) , `StreamTableSource` (with `getDataStream` interface) and `BatchStreamTableSource` (with both interfaces), they all extend `TableSource`. In this way, we only provide 4 classes to user, the implementers should extend one of the latter three abstract class. What do you think ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 Hi @mushketyk , thanks for the updating. Regarding to the `TableSource` , currently, there are 8 `TableSource` interfaces provided to users including trait and abstract class. It is a little hard to choose which one to use to implement a custom table source. What about to implement `TableSource` as abstract class and provide default implementation for `getFieldNames()` and `getFieldsIndices()`. And provide three other abstract class : `BatchTableSource` (with `getDataSet` interface) , `StreamTableSource` (with `getDataStream` interface) and `BatchStreamTableSource` (with both interfaces), they all extend `TableSource`. In this way, we only provide 4 classes to user, the implementers should extend one of the latter three abstract class. What do you think ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @wuchong

          >> And provide three other abstract class : BatchTableSource (with getDataSet interface) , StreamTableSource (with getDataStream interface) and BatchStreamTableSource (with both interfaces), they all extend TableSource

          I thought about this, but in this case we won't be able to use an instance of `BatchStreamTableSource` where an instance of `BatchTableSource` or `StreamTableSource` is expected. Which seems to make `BatchStreamTableSource` abstract class useless.

          I believe that we should be able to use `BatchStreamTableSource` where either `TableSource`, `BatchTableSource` or `StreamTableSource` is expected and this requires to use multiple inheritance which is only possible with traits. But since we want to provide partial implementations of these types I've add several abstract class for users to inherit from.

          I see the issue with current approach, but I am not sure how to simplify it to achieve all required goals. Would better documentation do the trick?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @wuchong >> And provide three other abstract class : BatchTableSource (with getDataSet interface) , StreamTableSource (with getDataStream interface) and BatchStreamTableSource (with both interfaces), they all extend TableSource I thought about this, but in this case we won't be able to use an instance of `BatchStreamTableSource` where an instance of `BatchTableSource` or `StreamTableSource` is expected. Which seems to make `BatchStreamTableSource` abstract class useless. I believe that we should be able to use `BatchStreamTableSource` where either `TableSource`, `BatchTableSource` or `StreamTableSource` is expected and this requires to use multiple inheritance which is only possible with traits. But since we want to provide partial implementations of these types I've add several abstract class for users to inherit from. I see the issue with current approach, but I am not sure how to simplify it to achieve all required goals. Would better documentation do the trick?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3039

          What about extracting `getDataSet(ExecutionEnvironment)` and `getDataStream(StreamExecutionEnvironment)` to interfaces that called like `DataSetGetter` and `DataStreamGetter`.

          And we can make `BatchTableSource` extend `TableSource` abstract class and implement `DataSetGetter` interface. Make `StreamTableSource` extend `TableSource` abstract class and implement `DataStreamGetter` interface. And make `BatchStreamTableSource` implement both `DataSetGetter` and `DataStreamGetter`. So that we can use `TableSource` plus `DataSetGetter` where only `BatchTableSource` is expected. For example, the `BatchTableSourceScan` can be changed to like this:

          ```scala
          class BatchTableSourceScan(
          cluster: RelOptCluster,
          traitSet: RelTraitSet,
          table: RelOptTable,
          val tableSource: TableSource[_],
          val datasetGetter: DataSetGetter)
          ```

          Can this solve our problem ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 What about extracting `getDataSet(ExecutionEnvironment)` and `getDataStream(StreamExecutionEnvironment)` to interfaces that called like `DataSetGetter` and `DataStreamGetter`. And we can make `BatchTableSource` extend `TableSource` abstract class and implement `DataSetGetter` interface. Make `StreamTableSource` extend `TableSource` abstract class and implement `DataStreamGetter` interface. And make `BatchStreamTableSource` implement both `DataSetGetter` and `DataStreamGetter`. So that we can use `TableSource` plus `DataSetGetter` where only `BatchTableSource` is expected. For example, the `BatchTableSourceScan` can be changed to like this: ```scala class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, val tableSource: TableSource [_] , val datasetGetter: DataSetGetter) ``` Can this solve our problem ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94851899

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala —
          @@ -0,0 +1,30 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.sources
          +
          +/**
          + * Partial implementation of the [[BatchStreamTableSource]] trait.
          + *
          + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]].
          + */
          +abstract class AbstractBatchStreamTableSource[T]
          + extends AbstractTableSource[T]
          + with BatchStreamTableSource[T] {
          — End diff –

          can't we extend from `BatchTableSource` and `StreamTableSource` instead of having an additional `BatchStreamTableSource`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94851899 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala — @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [ [BatchStreamTableSource] ] trait. + * + * @tparam T Type of the [ [org.apache.flink.api.java.DataSet] ] created by this [ [TableSource] ]. + */ +abstract class AbstractBatchStreamTableSource [T] + extends AbstractTableSource [T] + with BatchStreamTableSource [T] { — End diff – can't we extend from `BatchTableSource` and `StreamTableSource` instead of having an additional `BatchStreamTableSource`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94852514

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java —
          @@ -92,4 +90,25 @@ public void testBatchTableSourceSQL() throws Exception

          { compareResultAsText(results, expected); }

          + @Test
          + public void testNestedBatchTableSourceSQL() throws Exception {
          — End diff –

          The Scala SQL ITCase should be sufficient to test this feature.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94852514 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java — @@ -92,4 +90,25 @@ public void testBatchTableSourceSQL() throws Exception { compareResultAsText(results, expected); } + @Test + public void testNestedBatchTableSourceSQL() throws Exception { — End diff – The Scala SQL ITCase should be sufficient to test this feature.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94857414

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn
          import org.apache.flink.api.common.typeutils.CompositeType
          import org.apache.flink.api.java.typeutils.

          {GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo}

          import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
          -import org.apache.flink.table.api.TableConfig
          +import org.apache.flink.table.api.

          {TableConfig, TableEnvironment}

          — End diff –

          import can be removed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94857414 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils. {GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api. {TableConfig, TableEnvironment} — End diff – import can be removed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94851171

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,21 +19,28 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
          + * field names and field indices are derived from the returned type.
          + *
          + * In case if custom field names are required one need to implement both
          + * [TableSource#getFieldsNames] and [TableSource#getFieldsIndices].
          *

          • @tparam T The return type of the [[TableSource]].
            */
            trait TableSource[T] {
              • End diff –

          Please rename `getFieldsNames()` to `getFieldNames()` (the original `getFieldsNames() looks like a typo to me.`) and `getFieldsIndicies()` to `getFieldIndicies()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94851171 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,21 +19,28 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to implement both + * [ TableSource#getFieldsNames ] and [ TableSource#getFieldsIndices ]. * @tparam T The return type of the [ [TableSource] ]. */ trait TableSource [T] { End diff – Please rename `getFieldsNames()` to `getFieldNames()` (the original `getFieldsNames() looks like a typo to me.`) and `getFieldsIndicies()` to `getFieldIndicies()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94859631

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +511,92 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + *
          + * @param inputType The TypeInformation extract the field names.
          + * @tparam A The type of the TypeInformation.
          + * @return A an array holding the field names
          + */
          + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + }

          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + fieldNames
          + }
          +
          + /**
          + * Validate if class represented by the typeInfo is static and globally accessible
          + * @param typeInfo type to check
          + * @throws TableException if type does not meet these criteria
          + */
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field indexes for a given [[TypeInformation]].
          + *
          + * Field indexes are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          — End diff –

          No need to mention this, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94859631 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * + * @param inputType The TypeInformation extract the field names. + * @tparam A The type of the TypeInformation. + * @return A an array holding the field names + */ + def getFieldNames [A] (inputType: TypeInformation [A] ): Array [String] = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field indexes for a given [ [TypeInformation] ]. + * + * Field indexes are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a — End diff – No need to mention this, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94853135

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala —
          @@ -60,4 +67,52 @@ object CommonTestData

          { ignoreComments = "%" ) }

          +
          + def getNestedTableSource: BatchTableSource[Person] = {
          + new AbstractBatchTableSource[Person] {
          + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] =

          { + val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment + executionEnvironment.fromCollection( + util.Arrays.asList( + new Person("Mike", "Smith", new Address("5th Ave", "New-York")), + new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")), + new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))), + getReturnType + ) + }

          +
          + /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
          + override def getReturnType: TypeInformation[Person] = new PojoTypeInfo[Person](
          — End diff –

          `TypeExtractor.getForClass(Person)` should return a `PojoTypeInfo[Person]`. No need to fiddle with Java reflection

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94853135 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala — @@ -60,4 +67,52 @@ object CommonTestData { ignoreComments = "%" ) } + + def getNestedTableSource: BatchTableSource [Person] = { + new AbstractBatchTableSource [Person] { + override def getDataSet(execEnv: ExecutionEnvironment): DataSet [Person] = { + val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment + executionEnvironment.fromCollection( + util.Arrays.asList( + new Person("Mike", "Smith", new Address("5th Ave", "New-York")), + new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")), + new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))), + getReturnType + ) + } + + /** Returns the [ [TypeInformation] ] for the return type of the [ [TableSource] ]. */ + override def getReturnType: TypeInformation [Person] = new PojoTypeInfo [Person] ( — End diff – `TypeExtractor.getForClass(Person)` should return a `PojoTypeInfo [Person] `. No need to fiddle with Java reflection
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94858739

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +511,92 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          — End diff –

          I don't think we need to mention this. All Flink types are either composite or atomic (even though this is not strictly enforced).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94858739 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a — End diff – I don't think we need to mention this. All Flink types are either composite or atomic (even though this is not strictly enforced).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94856759

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala —
          @@ -120,6 +124,7 @@ trait FlinkRel {
          case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
          case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
          case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
          + case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(0).getType()).asInstanceOf[Int]
          — End diff –

          shouldn't `get(0)` access the type at the index of the current field that is folded instead of always the first?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94856759 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala — @@ -120,6 +124,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 + case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(0).getType()).asInstanceOf [Int] — End diff – shouldn't `get(0)` access the type at the index of the current field that is folded instead of always the first?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94859657

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -535,4 +511,92 @@ object TableEnvironment

          { new ScalaStreamTableEnv(executionEnvironment, tableConfig) }

          +
          + /**
          + * Returns field names for a given [[TypeInformation]].
          + *
          + * Field names are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + *
          + * @param inputType The TypeInformation extract the field names.
          + * @tparam A The type of the TypeInformation.
          + * @return A an array holding the field names
          + */
          + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
          + validateType(inputType)
          +
          + val fieldNames: Array[String] = inputType match

          { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + }

          +
          + if (fieldNames.contains("*"))

          { + throw new TableException("Field name can not be '*'.") + }

          +
          + fieldNames
          + }
          +
          + /**
          + * Validate if class represented by the typeInfo is static and globally accessible
          + * @param typeInfo type to check
          + * @throws TableException if type does not meet these criteria
          + */
          + def validateType(typeInfo: TypeInformation[_]): Unit = {
          + val clazz = typeInfo.getTypeClass
          + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
          + !Modifier.isPublic(clazz.getModifiers) ||
          + clazz.getCanonicalName == null)

          { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + }

          + }
          +
          + /**
          + * Returns field indexes for a given [[TypeInformation]].
          + *
          + * Field indexes are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + *
          + * @param inputType The TypeInformation extract the field positions from.
          + * @return A an array holding the field positions
          + */
          + def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] =

          { + getFieldNames(inputType).indices.toArray + }

          +
          + /**
          + * Returns field types for a given [[TypeInformation]].
          + *
          + * Field types are automatically extracted for
          + * [[org.apache.flink.api.common.typeutils.CompositeType]]
          + * or [[org.apache.flink.api.common.typeinfo.AtomicType]].
          + * The method fails if inputType is not a
          — End diff –

          No need to mention this, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94859657 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -535,4 +511,92 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names for a given [ [TypeInformation] ]. + * + * Field names are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * + * @param inputType The TypeInformation extract the field names. + * @tparam A The type of the TypeInformation. + * @return A an array holding the field names + */ + def getFieldNames [A] (inputType: TypeInformation [A] ): Array [String] = { + validateType(inputType) + + val fieldNames: Array [String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation [_] ): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field indexes for a given [ [TypeInformation] ]. + * + * Field indexes are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * + * @param inputType The TypeInformation extract the field positions from. + * @return A an array holding the field positions + */ + def getFieldIndexes(inputType: TypeInformation [_] ): Array [Int] = { + getFieldNames(inputType).indices.toArray + } + + /** + * Returns field types for a given [ [TypeInformation] ]. + * + * Field types are automatically extracted for + * [ [org.apache.flink.api.common.typeutils.CompositeType] ] + * or [ [org.apache.flink.api.common.typeinfo.AtomicType] ]. + * The method fails if inputType is not a — End diff – No need to mention this, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r94865876

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala —
          @@ -0,0 +1,30 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.sources
          +
          +/**
          + * Partial implementation of the [[BatchStreamTableSource]] trait.
          + *
          + * @tparam T Type of the [[org.apache.flink.api.java.DataSet]] created by this [[TableSource]].
          + */
          +abstract class AbstractBatchStreamTableSource[T]
          + extends AbstractTableSource[T]
          + with BatchStreamTableSource[T] {
          — End diff –

          Good point. I'll update this.

          What do you think about the overall design of TableSource related classes?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r94865876 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/AbstractBatchStreamTableSource.scala — @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Partial implementation of the [ [BatchStreamTableSource] ] trait. + * + * @tparam T Type of the [ [org.apache.flink.api.java.DataSet] ] created by this [ [TableSource] ]. + */ +abstract class AbstractBatchStreamTableSource [T] + extends AbstractTableSource [T] + with BatchStreamTableSource [T] { — End diff – Good point. I'll update this. What do you think about the overall design of TableSource related classes?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @fhueske, @wuchong

          I think we can solve the problem with current `TableSource` class hierarchy in a different way. If we remove all abstract classes and move all default implementations in `TableSource` trait it will make class hierarchy much simpler. The only drawback of this is that Java users will need to provide implementations of trait methods that explicitly [call default implementations](http://stackoverflow.com/a/7637888).

          I don't think this bad since it's a common way to extend Scala traits. We can additionally remove `getFieldNames` and `getFieldIndices` if you think they are superfluous. But I don't think there is a big difference.

          What do you think @fhueske, @wuchong ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I think we can solve the problem with current `TableSource` class hierarchy in a different way. If we remove all abstract classes and move all default implementations in `TableSource` trait it will make class hierarchy much simpler. The only drawback of this is that Java users will need to provide implementations of trait methods that explicitly [call default implementations] ( http://stackoverflow.com/a/7637888 ). I don't think this bad since it's a common way to extend Scala traits. We can additionally remove `getFieldNames` and `getFieldIndices` if you think they are superfluous. But I don't think there is a big difference. What do you think @fhueske, @wuchong ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3039

          I'm fine with removing `getFieldNames` and `getFieldIndices`. The way of Scala trait is too hard to implement a custom `TableSource` for Java users who do not know this trick.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I'm fine with removing `getFieldNames` and `getFieldIndices`. The way of Scala trait is too hard to implement a custom `TableSource` for Java users who do not know this trick.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          I just had a crazy thought.
          What do you think about moving `getFieldNames()` and `getFieldIndicies()` into a separate trait / interface maybe `DefinedFieldNames`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 I just had a crazy thought. What do you think about moving `getFieldNames()` and `getFieldIndicies()` into a separate trait / interface maybe `DefinedFieldNames`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          I don't think I got the idea Could you elaborate on it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think I got the idea Could you elaborate on it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          we remove the methods from `TableSource` and add them to an interface. If a table source does not implement the methods, we use the names provided by the `TypeInformation`. If the table source implements the methods, we use those names. The distinction is done in `TableSourceTable`

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 we remove the methods from `TableSource` and add them to an interface. If a table source does not implement the methods, we use the names provided by the `TypeInformation`. If the table source implements the methods, we use those names. The distinction is done in `TableSourceTable` What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          I don't think we will win a lot with this. Even if we remove these two methods from the `TableSource` trait interface there is still `getTypeIndices` method and Java users will have to call it if they are going to implement a `TableSource` trait. And if a user knows how to inherit a trait with one method he/she will be able to inherit a trait with three methods.

          The second problem with this approach is that it's not really Object-Oriented. We will have to rely on reflection tricks (probably sugared with pattern matching) while we simply having three methods is more clean OO solution.

          What if we leave all three methods and simply add some base Java implementations that already implement these traits? Something like `JavaBatchTableSource`, `JavaStreamTableSource`, and `JavaBatchStreamTableSource`? Then users will not need to struggle with the traits inheritance issues.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think we will win a lot with this. Even if we remove these two methods from the `TableSource` trait interface there is still `getTypeIndices` method and Java users will have to call it if they are going to implement a `TableSource` trait. And if a user knows how to inherit a trait with one method he/she will be able to inherit a trait with three methods. The second problem with this approach is that it's not really Object-Oriented. We will have to rely on reflection tricks (probably sugared with pattern matching) while we simply having three methods is more clean OO solution. What if we leave all three methods and simply add some base Java implementations that already implement these traits? Something like `JavaBatchTableSource`, `JavaStreamTableSource`, and `JavaBatchStreamTableSource`? Then users will not need to struggle with the traits inheritance issues.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          Which `getTypeIndicies` methods are you referring to? `TableSource` does only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move the latter two to a separate interface, only `getReturnType` is left.

          Also I think this is typical OO design. We do not need reflection to check if an object implements an interface. That's a very common operation in Java and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is sufficient to check whether the table source implements the interface or not.

          Isn't this a good compromise of having a lean interface (also simple for Java users) and at the same time the possibility to override field names if necessary?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Which `getTypeIndicies` methods are you referring to? `TableSource` does only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move the latter two to a separate interface, only `getReturnType` is left. Also I think this is typical OO design. We do not need reflection to check if an object implements an interface. That's a very common operation in Java and Scala. A simple, `isInstanceOf [DefinesFieldNames] ` in `TableSourceTable` is sufficient to check whether the table source implements the interface or not. Isn't this a good compromise of having a lean interface (also simple for Java users) and at the same time the possibility to override field names if necessary?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          @fhueske

          Sorry, there are only two methods. Please ignore my comment

          I think you are right and this seems like a good approach. If @wuchong is on board with this I'll update the PR accordingly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske Sorry, there are only two methods. Please ignore my comment I think you are right and this seems like a good approach. If @wuchong is on board with this I'll update the PR accordingly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3039

          I like this idea. In this way, we only need to provide `BatchTableSource` and `StreamTableSource` interfaces, not involving the odd `BatchStreamTableSource`. We can keep the interface very clean.

          If I understand right, all the concrete implementation of `TableSource` will not implement `DefinesFieldNames` for now ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I like this idea. In this way, we only need to provide `BatchTableSource` and `StreamTableSource` interfaces, not involving the odd `BatchStreamTableSource`. We can keep the interface very clean. If I understand right, all the concrete implementation of `TableSource` will not implement `DefinesFieldNames` for now ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          @mushketyk no worries

          @wuchong Since the methods are only added when needed by implementing the interface there is no default implementation. The logic of the default implementation (calling the static method of `TableEnvironment`) is directly put into the `TableSourceTable` and only replaced if the table source implements the new interface.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 @mushketyk no worries @wuchong Since the methods are only added when needed by implementing the interface there is no default implementation. The logic of the default implementation (calling the static method of `TableEnvironment`) is directly put into the `TableSourceTable` and only replaced if the table source implements the new interface.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3039

          Make sense to me 😄

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 Make sense to me 😄
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          @fhueske @wuchong Makes sense to me as well.

          I'll try to update the PR during the weekend.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske @wuchong Makes sense to me as well. I'll try to update the PR during the weekend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @fhueske, @wuchong

          I've updated the PR according to your feedback.
          Could you please review it again?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong I've updated the PR according to your feedback. Could you please review it again?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r95178608

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -53,7 +53,9 @@ class CsvTableSource(
          ignoreFirstLine: Boolean = false,
          ignoreComments: String = null,
          lenient: Boolean = false)

          • extends AbstractBatchStreamTableSource[Row]
            + extends BatchTableSource[Row]
            + with StreamTableSource[Row]
            + with DefinedFieldNames
              • End diff –

          If we define `returnType` as `new RowTypeInfo(fieldTypes, fieldNames)`, we do not need to implement `DefinedFieldNames`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95178608 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -53,7 +53,9 @@ class CsvTableSource( ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) extends AbstractBatchStreamTableSource [Row] + extends BatchTableSource [Row] + with StreamTableSource [Row] + with DefinedFieldNames End diff – If we define `returnType` as `new RowTypeInfo(fieldTypes, fieldNames)`, we do not need to implement `DefinedFieldNames`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r95292990

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,22 +19,23 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
          + * field names and field indices are derived from the returned type.
          + *
          + * In case if custom field names are required one need to additionally implement
          — End diff –

          In case if -> In case of

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95292990 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement — End diff – In case if -> In case of
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r95333198

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,22 +19,23 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
          + * field names and field indices are derived from the returned type.
          + *
          + * In case if custom field names are required one need to additionally implement
          — End diff –

          I am not sure about this. I've checked it with [Grammarly](grammarly.com) and it does not complain about "In case if", but complains about the "in case of".

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95333198 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement — End diff – I am not sure about this. I've checked it with [Grammarly] (grammarly.com) and it does not complain about "In case if", but complains about the "in case of".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3039#discussion_r95339229

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala —
          @@ -19,22 +19,23 @@
          package org.apache.flink.table.sources

          import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.TableEnvironment

          -/** Defines an external table by providing schema information, i.e., field names and types.
          +/** Defines an external table by providing schema information and used to produce a
          + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
          + * Schema information consists of a data type, field names, and corresponding indices of
          + * these names in the data type.
          + *
          + * To define a TableSource one need to implement [TableSource#getReturnType]. In this case
          + * field names and field indices are derived from the returned type.
          + *
          + * In case if custom field names are required one need to additionally implement
          — End diff –

          My mistake, `In case if` is fine here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3039#discussion_r95339229 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala — @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [ [org.apache.flink.api.scala.DataSet] ] or [ [org.apache.flink.streaming.api.scala.DataStream] ]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [ TableSource#getReturnType ]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement — End diff – My mistake, `In case if` is fine here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3039

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/3039

          Hi @fhueske, @wuchong

          Thank you for your reviews and your help with this PR.
          I've updated the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 Hi @fhueske, @wuchong Thank you for your reviews and your help with this PR. I've updated the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3039

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3039
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2 with a504abe4656e104e6b63db001542f3180e191740
          Fixed for 1.3 with 38ded2bb00aeb5c9581fa7ef313e5b9f803f5c26

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2 with a504abe4656e104e6b63db001542f3180e191740 Fixed for 1.3 with 38ded2bb00aeb5c9581fa7ef313e5b9f803f5c26

            People

            • Assignee:
              ivan.mushketyk Ivan Mushketyk
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development