Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5348

Support custom field names for RowTypeInfo

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Core
    • Labels:
      None

      Description

      Currently, the RowTypeInfo doesn't support optional custom field names, but forced to generate f0 ~ fn as field names. It would be better to support custom names and will benefit some cases (e.g. FLINK-5280).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user wuchong opened a pull request:

          https://github.com/apache/flink/pull/3020

          FLINK-5348 [core] Support custom field names for RowTypeInfo

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/wuchong/flink row-FLINK-5348

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3020.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3020


          commit dd470ec4a77155ece90b24b54dc6695df5125aa6
          Author: Jark Wu <wuchong.wc@alibaba-inc.com>
          Date: 2016-12-16T09:24:01Z

          FLINK-5348 [core] Support custom field names for RowTypeInfo


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/3020 FLINK-5348 [core] Support custom field names for RowTypeInfo Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink row- FLINK-5348 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3020.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3020 commit dd470ec4a77155ece90b24b54dc6695df5125aa6 Author: Jark Wu <wuchong.wc@alibaba-inc.com> Date: 2016-12-16T09:24:01Z FLINK-5348 [core] Support custom field names for RowTypeInfo
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tonycox commented on the issue:

          https://github.com/apache/flink/pull/3020

          Hi @wuchong , in this case I think you should adapt `getFieldIndex`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tonycox commented on the issue: https://github.com/apache/flink/pull/3020 Hi @wuchong , in this case I think you should adapt `getFieldIndex`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3020

          Hi @tonycox , the field names of RowTypeInfo are guaranteed to be unique. So I think `getFieldIndex ` is fine with it. Do you have special case which `getFieldIndex` give a wrong result ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3020 Hi @tonycox , the field names of RowTypeInfo are guaranteed to be unique. So I think `getFieldIndex ` is fine with it. Do you have special case which `getFieldIndex` give a wrong result ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tonycox commented on the issue:

          https://github.com/apache/flink/pull/3020

          @wuchong sorry, my fault, didn't spot duplicate field names checkers. What about cases, Is flink-table case-sensitive?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tonycox commented on the issue: https://github.com/apache/flink/pull/3020 @wuchong sorry, my fault, didn't spot duplicate field names checkers. What about cases, Is flink-table case-sensitive?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93405238

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + for (int i = 0; i < fieldIndex; i++)

          { + offset += this.getTypeAt(i).getTotalFields(); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          + for (int i = 0; i < this.fieldNames.length; i++) {
          + if (fieldNames[i].equals(field))

          { + // found field + fieldType = this.getTypeAt(i); + break; + }

          + offset += this.getTypeAt.getTotalFields();
          + }
          + if (fieldType == null)

          { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + }

          + }
          +
          + String tail = matcher.group(3);
          +
          + if (tail == null) {
          + // expression hasn't nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + }

          else

          { + result.add(new FlatFieldDescriptor(offset, fieldType)); + }

          + } else {
          + // expression has nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + }

          else

          { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + }

          + }
          + }
          + }
          +
          + @Override
          + public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
          — End diff –

          This is the same logic as in `CaseClassTypeInfo.getTypeAt()` only ported to Java, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93405238 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames [i] .equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt .getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { — End diff – This is the same logic as in `CaseClassTypeInfo.getTypeAt()` only ported to Java, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93465353

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + for (int i = 0; i < fieldIndex; i++)

          { + offset += this.getTypeAt(i).getTotalFields(); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          + for (int i = 0; i < this.fieldNames.length; i++) {
          + if (fieldNames[i].equals(field))

          { + // found field + fieldType = this.getTypeAt(i); + break; + }

          + offset += this.getTypeAt.getTotalFields();
          + }
          + if (fieldType == null)

          { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + }

          + }
          +
          + String tail = matcher.group(3);
          +
          + if (tail == null) {
          + // expression hasn't nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + }

          else

          { + result.add(new FlatFieldDescriptor(offset, fieldType)); + }

          + } else {
          + // expression has nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + }

          else

          { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + }

          + }
          + }
          + }
          +
          + @Override
          + public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
          + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
          + if (!matcher.matches()) {
          + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
          + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))

          { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + }

          else

          { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + }

          + }
          +
          + String field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<X> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity()) {
          — End diff –

          `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`.
          So we could simplify this a bit here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93465353 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames [i] .equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt .getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if (!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<X> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { — End diff – `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`. So we could simplify this a bit here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93405183

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          — End diff –

          This is the same logic as in `CaseClassTypeInfo.getFlatFields()` only ported to Java, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93405183 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { — End diff – This is the same logic as in `CaseClassTypeInfo.getFlatFields()` only ported to Java, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93405538

          — Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java —
          @@ -18,12 +18,86 @@
          package org.apache.flink.api.java.typeutils;

          import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.CompositeType;
          +import org.junit.BeforeClass;
          import org.junit.Test;

          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertArrayEquals;
          import static org.junit.Assert.assertEquals;
          import static org.junit.Assert.assertNotEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.junit.Assert.fail;

          public class RowTypeInfoTest {
          — End diff –

          Add tests for `getFlatFields()`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93405538 — Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java — @@ -18,12 +18,86 @@ package org.apache.flink.api.java.typeutils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.junit.BeforeClass; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RowTypeInfoTest { — End diff – Add tests for `getFlatFields()`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93466144

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + for (int i = 0; i < fieldIndex; i++)

          { + offset += this.getTypeAt(i).getTotalFields(); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          — End diff –

          We could use `getFieldIndex()` here to translate the field name into an index and use a common code path with the int field index to compute offset and fetch type.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93466144 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { — End diff – We could use `getFieldIndex()` here to translate the field name into an index and use a common code path with the int field index to compute offset and fetch type.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93465385

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity()) {
          — End diff –

          `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`.
          So we could simplify this a bit here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93465385 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { — End diff – `TupleTypeBase.getFieldAt()` does check for index bounds as well and throws an `IndexOutOfBoundsException`. So we could simplify this a bit here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93466347

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + for (int i = 0; i < fieldIndex; i++)

          { + offset += this.getTypeAt(i).getTotalFields(); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          + for (int i = 0; i < this.fieldNames.length; i++) {
          + if (fieldNames[i].equals(field))

          { + // found field + fieldType = this.getTypeAt(i); + break; + }

          + offset += this.getTypeAt.getTotalFields();
          + }
          + if (fieldType == null)

          { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + }

          + }
          +
          + String tail = matcher.group(3);
          +
          + if (tail == null) {
          + // expression hasn't nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + }

          else

          { + result.add(new FlatFieldDescriptor(offset, fieldType)); + }

          + } else {
          + // expression has nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + }

          else

          { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + }

          + }
          + }
          + }
          +
          + @Override
          + public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
          + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
          + if (!matcher.matches()) {
          + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
          + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))

          { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + }

          else

          { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + }

          + }
          +
          + String field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<X> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          — End diff –

          translate `field` into an index with `getFieldIndex()` and use a common path fetch the type?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93466347 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames [i] .equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt .getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression); + if (!matcher.matches()) { + if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) || + fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here."); + } else { + throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\"."); + } + } + + String field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<X> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + fieldType = this.getTypeAt(fieldIndex); + } else { — End diff – translate `field` into an index with `getFieldIndex()` and use a common path fetch the type?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93404965

          — Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java —
          @@ -18,12 +18,86 @@
          package org.apache.flink.api.java.typeutils;

          import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.CompositeType;
          +import org.junit.BeforeClass;
          import org.junit.Test;

          +import java.util.ArrayList;
          +import java.util.Arrays;
          +import java.util.List;
          +
          +import static org.junit.Assert.assertArrayEquals;
          import static org.junit.Assert.assertEquals;
          import static org.junit.Assert.assertNotEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.junit.Assert.fail;

          public class RowTypeInfoTest {
          + private static List<TypeInformation<?>> typeList = new ArrayList<>();
          +
          +
          + @BeforeClass
          + public static void setUp() throws Exception

          { + typeList.add(BasicTypeInfo.INT_TYPE_INFO); + typeList.add(new RowTypeInfo( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BIG_DEC_TYPE_INFO)); + typeList.add(BasicTypeInfo.STRING_TYPE_INFO); + }

          +
          +
          + @Test
          + public void testDuplicateCustomFieldNames() {
          — End diff –

          Split test and use `@Test(expected = IllegalArgumentException.class)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93404965 — Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java — @@ -18,12 +18,86 @@ package org.apache.flink.api.java.typeutils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.junit.BeforeClass; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RowTypeInfoTest { + private static List<TypeInformation<?>> typeList = new ArrayList<>(); + + + @BeforeClass + public static void setUp() throws Exception { + typeList.add(BasicTypeInfo.INT_TYPE_INFO); + typeList.add(new RowTypeInfo( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BIG_DEC_TYPE_INFO)); + typeList.add(BasicTypeInfo.STRING_TYPE_INFO); + } + + + @Test + public void testDuplicateCustomFieldNames() { — End diff – Split test and use `@Test(expected = IllegalArgumentException.class)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93405114

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          — End diff –

          Can we use arrays instead of lists to be consistent with the other constructor?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93405114 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { — End diff – Can we use arrays instead of lists to be consistent with the other constructor?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93566058

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          — End diff –

          It is almost ported from `CaseClassTypeInfo.getFlatFields()` except the field index. The `CaseClassTypeInfo` is 1-based, but `RowTypeInfo` is 0-based.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93566058 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { — End diff – It is almost ported from `CaseClassTypeInfo.getFlatFields()` except the field index. The `CaseClassTypeInfo` is 1-based, but `RowTypeInfo` is 0-based.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3020#discussion_r93566506

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java —
          @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) {
          }
          }

          + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) {
          + super(Row.class, types == null ? null : types.toArray(new TypeInformation[types.size()]));
          + checkNotNull(fieldNames, "FieldNames should not be null.");
          + checkArgument(
          + types.size() == fieldNames.size(),
          + "Number of field types and names is different.");
          + checkArgument(
          + types.size() == new HashSet<>(fieldNames).size(),
          + "Field names are not unique.");
          +
          + this.fieldNames = new String[fieldNames.size()];
          +
          + for (int i = 0; i < fieldNames.size(); i++)

          { + this.fieldNames[i] = fieldNames.get(i); + }

          + }
          +
          + @Override
          + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
          + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
          +
          + if (!matcher.matches())

          { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + }

          +
          + String field = matcher.group(0);
          +
          + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
          + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
          + // handle select all
          + int keyPosition = 0;
          + for (TypeInformation<?> fType : types) {
          + if (fType instanceof CompositeType)

          { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + }

          else

          { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + }

          + keyPosition++;
          + }
          + } else {
          + field = matcher.group(1);
          +
          + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
          + TypeInformation<?> fieldType = null;
          + if (intFieldMatcher.matches()) {
          + // field expression is an integer
          + int fieldIndex = Integer.valueOf(field);
          + if (fieldIndex > this.getArity())

          { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + }

          + for (int i = 0; i < fieldIndex; i++)

          { + offset += this.getTypeAt(i).getTotalFields(); + }

          + fieldType = this.getTypeAt(fieldIndex);
          + } else {
          + for (int i = 0; i < this.fieldNames.length; i++) {
          + if (fieldNames[i].equals(field))

          { + // found field + fieldType = this.getTypeAt(i); + break; + }

          + offset += this.getTypeAt.getTotalFields();
          + }
          + if (fieldType == null)

          { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + }

          + }
          +
          + String tail = matcher.group(3);
          +
          + if (tail == null) {
          + // expression hasn't nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + }

          else

          { + result.add(new FlatFieldDescriptor(offset, fieldType)); + }

          + } else {
          + // expression has nested field
          + if (fieldType instanceof CompositeType)

          { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + }

          else

          { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + }

          + }
          + }
          + }
          +
          + @Override
          + public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
          — End diff –

          It is almost ported from CaseClassTypeInfo.getFlatFields() except the field index. The CaseClassTypeInfo is 1-based, but RowTypeInfo is 0-based.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3020#discussion_r93566506 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java — @@ -54,6 +76,152 @@ public RowTypeInfo(TypeInformation<?>... types) { } } + public RowTypeInfo(List<TypeInformation<?>> types, List<String> fieldNames) { + super(Row.class, types == null ? null : types.toArray(new TypeInformation [types.size()] )); + checkNotNull(fieldNames, "FieldNames should not be null."); + checkArgument( + types.size() == fieldNames.size(), + "Number of field types and names is different."); + checkArgument( + types.size() == new HashSet<>(fieldNames).size(), + "Field names are not unique."); + + this.fieldNames = new String [fieldNames.size()] ; + + for (int i = 0; i < fieldNames.size(); i++) { + this.fieldNames[i] = fieldNames.get(i); + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); + + if (!matcher.matches()) { + throw new InvalidFieldReferenceException( + "Invalid tuple field reference \"" + fieldExpression + "\"."); + } + + String field = matcher.group(0); + + if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) || + (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) { + // handle select all + int keyPosition = 0; + for (TypeInformation<?> fType : types) { + if (fType instanceof CompositeType) { + CompositeType<?> cType = (CompositeType<?>) fType; + cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result); + keyPosition += cType.getTotalFields() - 1; + } else { + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)); + } + keyPosition++; + } + } else { + field = matcher.group(1); + + Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + TypeInformation<?> fieldType = null; + if (intFieldMatcher.matches()) { + // field expression is an integer + int fieldIndex = Integer.valueOf(field); + if (fieldIndex > this.getArity()) { + throw new InvalidFieldReferenceException( + "Row field expression \"" + field + "\" out of bounds of " + this.toString() + "."); + } + for (int i = 0; i < fieldIndex; i++) { + offset += this.getTypeAt(i).getTotalFields(); + } + fieldType = this.getTypeAt(fieldIndex); + } else { + for (int i = 0; i < this.fieldNames.length; i++) { + if (fieldNames [i] .equals(field)) { + // found field + fieldType = this.getTypeAt(i); + break; + } + offset += this.getTypeAt .getTotalFields(); + } + if (fieldType == null) { + throw new InvalidFieldReferenceException( + "Unable to find field \"" + field + "\" in type " + this.toString() + "."); + } + } + + String tail = matcher.group(3); + + if (tail == null) { + // expression hasn't nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields("*", offset, result); + } else { + result.add(new FlatFieldDescriptor(offset, fieldType)); + } + } else { + // expression has nested field + if (fieldType instanceof CompositeType) { + ((CompositeType) fieldType).getFlatFields(tail, offset, result); + } else { + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + "."); + } + } + } + } + + @Override + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { — End diff – It is almost ported from CaseClassTypeInfo.getFlatFields() except the field index. The CaseClassTypeInfo is 1-based, but RowTypeInfo is 0-based.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3020

          Thank you @fhueske , I think your points are very good. And I changed my code according to your suggestions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3020 Thank you @fhueske , I think your points are very good. And I changed my code according to your suggestions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3020

          Thanks for the update @wuchong!
          PR is good to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3020 Thanks for the update @wuchong! PR is good to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3020

          merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3020 merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3020

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3020
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with d163f841610644b51749e9449a5a8be982384fb7

          Show
          fhueske Fabian Hueske added a comment - Implemented with d163f841610644b51749e9449a5a8be982384fb7
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.2: d163f841610644b51749e9449a5a8be982384fb7

          Show
          twalthr Timo Walther added a comment - Fixed in 1.2: d163f841610644b51749e9449a5a8be982384fb7
          Hide
          twalthr Timo Walther added a comment -

          Sorry I meant: 386bdd299d3df60562d86e36517410fe44a06291

          Show
          twalthr Timo Walther added a comment - Sorry I meant: 386bdd299d3df60562d86e36517410fe44a06291

            People

            • Assignee:
              jark Jark Wu
              Reporter:
              jark Jark Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development