Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6783

Wrongly extracted TypeInformations for WindowedStream::aggregate

    Details

      Description

      The following test fails because of wrongly acquired output type for AggregateFunction:

      @Test
      public void testAggregateWithWindowFunctionDifferentResultTypes() throws Exception {
      	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
      
      	DataStream<Tuple3<String, String, Integer>> window = source
      		.keyBy(new TupleKeySelector())
      		.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
      		.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
      			@Override
      			public Tuple2<String, Integer> createAccumulator() {
      				return Tuple2.of("", 0);
      			}
      
      			@Override
      			public void add(
      				Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
      
      			}
      
      			@Override
      			public String getResult(Tuple2<String, Integer> accumulator) {
      				return accumulator.f0;
      			}
      
      			@Override
      			public Tuple2<String, Integer> merge(
      				Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
      				return Tuple2.of("", 0);
      			}
      		}, new WindowFunction<String, Tuple3<String, String, Integer>, String, TimeWindow>() {
      			@Override
      			public void apply(
      				String s,
      				TimeWindow window,
      				Iterable<String> input,
      				Collector<Tuple3<String, String, Integer>> out) throws Exception {
      				out.collect(Tuple3.of("", "", 0));
      			}
      		});
      
      	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
      		(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
      
      	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
      
      	Assert.assertTrue(operator instanceof WindowOperator);
      	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
      		(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
      
      	Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
      	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
      	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
      
      	processElementAndEnsureOutput(
      		operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
      }
      

      The test results in

      org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple type expected.
      
      	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157)
      	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451)
      	at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
      	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
      	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple type expected.
      	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204)
      	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154)
      	... 25 more
      

      I tracked down the issue and the reason is wrongly handled outputTypeArgumentIndex in TypeExtractor::getUnaryOperatorReturnType.

      My proposition is to remove/deprecate version of TypeExtractor::getUnaryOperatorReturnType that accepts hasIterable and hasCollector as parameters and move all invocations to explicitly passing index of output type (after fixing outputTypeArgumentIndex handling in line TypeExtractor:455.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r122657181

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java —
          @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T
          }

          /**
          + * Extracts type from given index from lambda. It supports nested types.
          + *
          + * @param exec lambda function to extract the type from
          + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
          + * @param paramLen count of total parameters of the lambda (including closure parameters)
          + * @param baseParametersLen count of lambda interface parameters (without closure parameters)
          + * @return extracted type
          + */
          + public static Type extractTypeFromLambda(
          + LambdaExecutable exec,
          + int[] lambdaTypeArgumentIndices,
          + int paramLen,
          + int baseParametersLen) {
          + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
          + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++)

          { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + }

          + return output;
          + }
          +
          + /**
          + * * This method extracts the n-th type argument from the given type. An InvalidTypesException
          + * is thrown if the type does not have any type arguments or if the index exceeds the number
          + * of type arguments.
          + *
          + * @param t Type to extract the type arguments from
          + * @param index Index of the type argument to extract
          + * @return The extracted type argument
          + * @throws InvalidTypesException if the given type does not have any type arguments or if the
          + * index exceeds the number of type arguments.
          + */
          + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
          + if (t instanceof ParameterizedType) {
          + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
          +
          + if (index < 0 || index >= actualTypeArguments.length)

          { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + }

          else

          { + return actualTypeArguments[index]; + }

          + } else

          { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + }

          + }
          +
          + /**
          + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object,
          + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class.
          + *
          + * @param baseClass
          + * @throws InvalidTypesException if the given class does not implement
          + * @return
          + */
          + public static Method getSingleAbstractMethod(Class<?> baseClass) {
          + Method sam = null;
          + for (Method method : baseClass.getMethods()) {
          + if (Modifier.isAbstract(method.getModifiers())) {
          + if (sam == null)

          { + sam = method; + }

          else {
          + throw new InvalidTypesException(
          + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM.");
          — End diff –

          @tedyu You are right. I've created a PR with a fix. Could you have a look? #4140

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r122657181 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java — @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T } /** + * Extracts type from given index from lambda. It supports nested types. + * + * @param exec lambda function to extract the type from + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy + * @param paramLen count of total parameters of the lambda (including closure parameters) + * @param baseParametersLen count of lambda interface parameters (without closure parameters) + * @return extracted type + */ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices [0] ]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** + * * This method extracts the n-th type argument from the given type. An InvalidTypesException + * is thrown if the type does not have any type arguments or if the index exceeds the number + * of type arguments. + * + * @param t Type to extract the type arguments from + * @param index Index of the type argument to extract + * @return The extracted type argument + * @throws InvalidTypesException if the given type does not have any type arguments or if the + * index exceeds the number of type arguments. + */ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. + * + * @param baseClass + * @throws InvalidTypesException if the given class does not implement + * @return + */ + public static Method getSingleAbstractMethod(Class<?> baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); — End diff – @tedyu You are right. I've created a PR with a fix. Could you have a look? #4140
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tedyu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r122570865

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java —
          @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T
          }

          /**
          + * Extracts type from given index from lambda. It supports nested types.
          + *
          + * @param exec lambda function to extract the type from
          + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
          + * @param paramLen count of total parameters of the lambda (including closure parameters)
          + * @param baseParametersLen count of lambda interface parameters (without closure parameters)
          + * @return extracted type
          + */
          + public static Type extractTypeFromLambda(
          + LambdaExecutable exec,
          + int[] lambdaTypeArgumentIndices,
          + int paramLen,
          + int baseParametersLen) {
          + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
          + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++)

          { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + }

          + return output;
          + }
          +
          + /**
          + * * This method extracts the n-th type argument from the given type. An InvalidTypesException
          + * is thrown if the type does not have any type arguments or if the index exceeds the number
          + * of type arguments.
          + *
          + * @param t Type to extract the type arguments from
          + * @param index Index of the type argument to extract
          + * @return The extracted type argument
          + * @throws InvalidTypesException if the given type does not have any type arguments or if the
          + * index exceeds the number of type arguments.
          + */
          + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
          + if (t instanceof ParameterizedType) {
          + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
          +
          + if (index < 0 || index >= actualTypeArguments.length)

          { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + }

          else

          { + return actualTypeArguments[index]; + }

          + } else

          { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + }

          + }
          +
          + /**
          + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object,
          + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class.
          + *
          + * @param baseClass
          + * @throws InvalidTypesException if the given class does not implement
          + * @return
          + */
          + public static Method getSingleAbstractMethod(Class<?> baseClass) {
          + Method sam = null;
          + for (Method method : baseClass.getMethods()) {
          + if (Modifier.isAbstract(method.getModifiers())) {
          + if (sam == null)

          { + sam = method; + }

          else {
          + throw new InvalidTypesException(
          + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM.");
          — End diff –

          This message seems to be inexact: if there is no SAM, sam would be null upon returning from the method.
          I suggest changing the message and adding a check (for null sam) prior to returning.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r122570865 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java — @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T } /** + * Extracts type from given index from lambda. It supports nested types. + * + * @param exec lambda function to extract the type from + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy + * @param paramLen count of total parameters of the lambda (including closure parameters) + * @param baseParametersLen count of lambda interface parameters (without closure parameters) + * @return extracted type + */ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices [0] ]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** + * * This method extracts the n-th type argument from the given type. An InvalidTypesException + * is thrown if the type does not have any type arguments or if the index exceeds the number + * of type arguments. + * + * @param t Type to extract the type arguments from + * @param index Index of the type argument to extract + * @return The extracted type argument + * @throws InvalidTypesException if the given type does not have any type arguments or if the + * index exceeds the number of type arguments. + */ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. + * + * @param baseClass + * @throws InvalidTypesException if the given class does not implement + * @return + */ + public static Method getSingleAbstractMethod(Class<?> baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); — End diff – This message seems to be inexact: if there is no SAM, sam would be null upon returning from the method. I suggest changing the message and adding a check (for null sam) prior to returning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys closed the pull request at:

          https://github.com/apache/flink/pull/4089

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys closed the pull request at: https://github.com/apache/flink/pull/4089
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/4089

          A hotfix is for when no FLINK ticket has been created, and typically no PR. This commit header should be something like `FLINK-6783 [streaming]`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4089 A hotfix is for when no FLINK ticket has been created, and typically no PR. This commit header should be something like ` FLINK-6783 [streaming] `.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys closed the pull request at:

          https://github.com/apache/flink/pull/4039

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys closed the pull request at: https://github.com/apache/flink/pull/4039
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          I created a hotfix for the discussed issue: #4089 . I will close this PR then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 I created a hotfix for the discussed issue: #4089 . I will close this PR then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dawidwys opened a pull request:

          https://github.com/apache/flink/pull/4089

          FLINK-6783[hotfix] Removed lamba indices for abstract classes

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dawidwys/flink lambda-type-hotfix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4089.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4089


          commit ed19187f2c918dd283b7e0cea263efb175d73742
          Author: Dawid Wysakowicz <dawid@getindata.com>
          Date: 2017-06-08T13:13:58Z

          FLINK-6783[hotfix] Removed lamba indices for abstract classes


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4089 FLINK-6783 [hotfix] Removed lamba indices for abstract classes Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink lambda-type-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4089 commit ed19187f2c918dd283b7e0cea263efb175d73742 Author: Dawid Wysakowicz <dawid@getindata.com> Date: 2017-06-08T13:13:58Z FLINK-6783 [hotfix] Removed lamba indices for abstract classes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120882223

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          Yes, it is not a serious problem. But we should change it to be consistent. @dawidwys Can you create a hot fix for it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120882223 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – Yes, it is not a serious problem. But we should change it to be consistent. @dawidwys Can you create a hot fix for it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120875203

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          Ha, it seems I might have merged to fast. What will happen if we leave it as is? Shouldn't the analysis simply fail? We should probably just push a hot fix for this.

          Sorry for the inconvenience.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120875203 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – Ha, it seems I might have merged to fast. What will happen if we leave it as is? Shouldn't the analysis simply fail? We should probably just push a hot fix for this. Sorry for the inconvenience.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          @dawidwys Could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 @dawidwys Could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fixed on release-1.3 in
          b2d6dc1d4914f29f6ed1120daff645b3c8073716

          Fixed on master in
          bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0

          Show
          aljoscha Aljoscha Krettek added a comment - Fixed on release-1.3 in b2d6dc1d4914f29f6ed1120daff645b3c8073716 Fixed on master in bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120862262

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          You are right! I was not aware of that. Sorry for that. Shall I change it then to `NO_INDEX`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120862262 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – You are right! I was not aware of that. Sorry for that. Shall I change it then to `NO_INDEX`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120860095

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          @dawidwys correct me if I'm wrong but as far as I know, a `FunctionalInterface` must be an interface. All `ProcessXXX` functions are regular classes with one abstract methods, but this does not qualify them for lambdas. See also https://stackoverflow.com/questions/24610207/abstract-class-as-functional-interface

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120860095 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – @dawidwys correct me if I'm wrong but as far as I know, a `FunctionalInterface` must be an interface. All `ProcessXXX` functions are regular classes with one abstract methods, but this does not qualify them for lambdas. See also https://stackoverflow.com/questions/24610207/abstract-class-as-functional-interface
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          @dawidwys I merged to master, now I rebased on `release-1.3` and running tests before merging there as well.

          Thanks for fixing this so quickly! 👌

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 @dawidwys I merged to master, now I rebased on `release-1.3` and running tests before merging there as well. Thanks for fixing this so quickly! 👌
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120558405

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          }
          }

          • /**
          • * Returns the binary operator's return type.
          • *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterables True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
          • * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN1> Left side input type
          • * @param <IN2> Right side input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterables,
          • boolean hasCollector,
          • TypeInformation<IN1> in1Type,
          • TypeInformation<IN2> in2Type,
          • String functionName,
          • boolean allowMissing) {
            + private static Type extractType(
            + LambdaExecutable exec,
            + int[] lambdaTypeArgumentIndices,
            + int paramLen,
            + int baseParametersLen)
            Unknown macro: { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + }
          • return getBinaryOperatorReturnType(
          • function,
          • baseClass,
          • hasIterables ? 0 : -1,
          • hasCollector ? 0 : -1,
          • in1Type,
          • in2Type,
          • functionName,
          • allowMissing
          • );
            + private static Method getSingleAbstractMethod(Class<?> baseClass) {
              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120558405 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac } } /** * Returns the binary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterables True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN1> Left side input type * @param <IN2> Right side input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { + private static Type extractType( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) Unknown macro: { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } return getBinaryOperatorReturnType( function, baseClass, hasIterables ? 0 : -1, hasCollector ? 0 : -1, in1Type, in2Type, functionName, allowMissing ); + private static Method getSingleAbstractMethod(Class<?> baseClass) { End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120558300

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -354,49 +479,21 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          /**

          • Returns the unary operator's return type.
            *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterable True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param inType Type of the input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN> Input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterable,
          • boolean hasCollector,
          • TypeInformation<IN> inType,
          • String functionName,
          • boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, - inType, - functionName, - allowMissing); - }

            -

          • /**
          • * Returns the unary operator's return type.
            + * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN</b>
              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120558300 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -354,49 +479,21 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac /** Returns the unary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterable True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param inType Type of the input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN> Input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, - inType, - functionName, - allowMissing); - } - /** * Returns the unary operator's return type. + * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN</b> End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120558346

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          throw new InvalidTypesException("Internal error occurred.", e);
          }
          if (exec != null) {
          + Preconditions.checkArgument(
          + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
          + "Indices for input type arguments within lambda not provided");
          + Preconditions.checkArgument(
          + lambdaOutputTypeArgumentIndices != null,
          + "Indices for output type arguments within lambda not provided");
          // check for lambda type erasure
          validateLambdaGenericParameters(exec);

          // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function

          • final int paramLen = exec.getParameterTypes().length - 1;
            + final int paramLen = exec.getParameterTypes().length;
            +
            + final Method sam = getSingleAbstractMethod(baseClass);
            + final int baseParametersLen = sam.getParameterTypes().length;
              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120558346 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120556550

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -216,7 +253,15 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          boolean allowMissing)
          {
          return getUnaryOperatorReturnType(

          • function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
            + function,
            + AggregateFunction.class,
            + 0,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120556550 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -216,7 +253,15 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac boolean allowMissing) { return getUnaryOperatorReturnType( function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 2, + new int[] {0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120556450

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java —
          @@ -981,12 +976,42 @@ public WindowedStream(KeyedStream<T, K> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType(
          + WindowFunction<IN, OUT, KEY, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + WindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {2, 0},
          + new int[]{3, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType(
          + ProcessWindowFunction<IN, OUT, KEY, ?> function,
          + TypeInformation<IN> inType,
          + String functionName) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessWindowFunction.class,
          + 0,
          + 1,
          + new int[]{2, 0}

          ,
          — End diff –

          I think it is lambda

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120556450 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java — @@ -981,12 +976,42 @@ public WindowedStream(KeyedStream<T, K> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType( + WindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + WindowFunction.class, + 0, + 1, + new int[] {2, 0}, + new int[]{3, 0}, + inType, + null, + false); + } + + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType( + ProcessWindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType, + String functionName) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessWindowFunction.class, + 0, + 1, + new int[]{2, 0} , — End diff – I think it is lambda
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120558383

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          }
          }

          • /**
          • * Returns the binary operator's return type.
          • *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterables True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
          • * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN1> Left side input type
          • * @param <IN2> Right side input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterables,
          • boolean hasCollector,
          • TypeInformation<IN1> in1Type,
          • TypeInformation<IN2> in2Type,
          • String functionName,
          • boolean allowMissing) {
            + private static Type extractType(
              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120558383 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac } } /** * Returns the binary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterables True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN1> Left side input type * @param <IN2> Right side input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { + private static Type extractType( End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120556277

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java —
          @@ -267,13 +267,15 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
          public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

          TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(

          • processFunction,
          • ProcessFunction.class,
          • false,
          • true,
          • getType(),
          • Utils.getCallLocationName(),
          • true);
            + processFunction,
            + ProcessFunction.class,
            + 0,
            + 1,
            + new int[] {0}

            ,

              • End diff –

          I think it is lambda

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120556277 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java — @@ -267,13 +267,15 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( processFunction, ProcessFunction.class, false, true, getType(), Utils.getCallLocationName(), true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[] {0} , End diff – I think it is lambda
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120555777

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -227,9 +237,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          public <R> SingleOutputStreamOperator<R> flatMap(
          CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
          • CoFlatMapFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coFlatMapper,
            + CoFlatMapFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120555777 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -227,9 +237,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, CoFlatMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coFlatMapper, + CoFlatMapFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120555933

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java —
          @@ -574,13 +574,15 @@ public ExecutionConfig getExecutionConfig() {
          public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

          TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(

          • processFunction,
          • ProcessFunction.class,
          • false,
          • true,
          • getType(),
          • Utils.getCallLocationName(),
          • true);
            + processFunction,
            + ProcessFunction.class,
            + 0,
            + 1,
            + new int[] {0}

            ,

              • End diff –

          I think it is lambda

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120555933 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java — @@ -574,13 +574,15 @@ public ExecutionConfig getExecutionConfig() { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( processFunction, ProcessFunction.class, false, true, getType(), Utils.getCallLocationName(), true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[] {0} , End diff – I think it is lambda
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120556233

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java —
          @@ -300,14 +304,18 @@ protected WithWindow(DataStream<T1> input1,
          */
          public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • FlatJoinFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "Join",
          • false);
            + function,
            + FlatJoinFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0, 0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120556233 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java — @@ -300,14 +304,18 @@ protected WithWindow(DataStream<T1> input1, */ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, FlatJoinFunction.class, true, true, input1.getType(), input2.getType(), "Join", false); + function, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[] {0, 0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120556140

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java —
          @@ -221,14 +221,18 @@ protected WithWindow(DataStream<T1> input1,
          */
          public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • JoinFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "Join",
          • false);
            + function,
            + JoinFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0, 0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120556140 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java — @@ -221,14 +221,18 @@ protected WithWindow(DataStream<T1> input1, */ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, true, true, input1.getType(), input2.getType(), "Join", false); + function, + JoinFunction.class, + 0, + 1, + 2, + new int[] {0, 0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120555882

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -254,9 +274,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          public <R> SingleOutputStreamOperator<R> process(
          CoProcessFunction<IN1, IN2, R> coProcessFunction) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
          • CoProcessFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coProcessFunction,
            + CoProcessFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120555882 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -254,9 +274,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, CoProcessFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coProcessFunction, + CoProcessFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120555725

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java —
          @@ -231,14 +231,18 @@ protected WithWindow(DataStream<T1> input1,
          public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • CoGroupFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "CoGroup",
          • false);
            + function,
              • End diff –

          DONE

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120555725 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java — @@ -231,14 +231,18 @@ protected WithWindow(DataStream<T1> input1, public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, CoGroupFunction.class, true, true, input1.getType(), input2.getType(), "CoGroup", false); + function, End diff – DONE
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120555761

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -203,9 +203,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          */
          public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
          • CoMapFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coMapper,
            + CoMapFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120555761 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -203,9 +203,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { */ public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, CoMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coMapper, + CoMapFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/4039

          Cool, thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4039 Cool, thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          I will address them today. I am working on last comment with enabling the changes also for Partitioner.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 I will address them today. I am working on last comment with enabling the changes also for Partitioner.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/4039

          @dawidwys What's your schedule to address the comments?
          This is one of the real blockers for the 1.3.1 release, and I would like to put the first RC this week.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4039 @dawidwys What's your schedule to address the comments? This is one of the real blockers for the 1.3.1 release, and I would like to put the first RC this week.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120412137

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          I think `ProcessAllWindowFunction` is a `FunctionalInterface`. The only abstract method `process`. All other methods have default implementation, so you can pass lambda as `ProcessAllWindowFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120412137 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – I think `ProcessAllWindowFunction` is a `FunctionalInterface`. The only abstract method `process`. All other methods have default implementation, so you can pass lambda as `ProcessAllWindowFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120398682

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java —
          @@ -300,14 +304,18 @@ protected WithWindow(DataStream<T1> input1,
          */
          public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • FlatJoinFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "Join",
          • false);
            + function,
            + FlatJoinFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0, 0}

            ,

              • End diff –

          Wrong lambda arguments. Should be `(0), (1), (2, 0)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120398682 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java — @@ -300,14 +304,18 @@ protected WithWindow(DataStream<T1> input1, */ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, FlatJoinFunction.class, true, true, input1.getType(), input2.getType(), "Join", false); + function, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[] {0, 0} , End diff – Wrong lambda arguments. Should be `(0), (1), (2, 0)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120397389

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -227,9 +237,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          public <R> SingleOutputStreamOperator<R> flatMap(
          CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
          • CoFlatMapFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coFlatMapper,
            + CoFlatMapFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120397389 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -227,9 +237,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, CoFlatMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coFlatMapper, + CoFlatMapFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120398789

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java —
          @@ -267,13 +267,15 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
          public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

          TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(

          • processFunction,
          • ProcessFunction.class,
          • false,
          • true,
          • getType(),
          • Utils.getCallLocationName(),
          • true);
            + processFunction,
            + ProcessFunction.class,
            + 0,
            + 1,
            + new int[] {0}

            ,

              • End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120398789 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java — @@ -267,13 +267,15 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( processFunction, ProcessFunction.class, false, true, getType(), Utils.getCallLocationName(), true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[] {0} , End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120399201

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java —
          @@ -981,12 +976,42 @@ public WindowedStream(KeyedStream<T, K> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType(
          + WindowFunction<IN, OUT, KEY, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + WindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {2, 0},
          + new int[]{3, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType(
          + ProcessWindowFunction<IN, OUT, KEY, ?> function,
          + TypeInformation<IN> inType,
          + String functionName) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessWindowFunction.class,
          + 0,
          + 1,
          + new int[]{2, 0}

          ,
          — End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120399201 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java — @@ -981,12 +976,42 @@ public WindowedStream(KeyedStream<T, K> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType( + WindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + WindowFunction.class, + 0, + 1, + new int[] {2, 0}, + new int[]{3, 0}, + inType, + null, + false); + } + + private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType( + ProcessWindowFunction<IN, OUT, KEY, ?> function, + TypeInformation<IN> inType, + String functionName) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessWindowFunction.class, + 0, + 1, + new int[]{2, 0} , — End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120397600

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java —
          @@ -574,13 +574,15 @@ public ExecutionConfig getExecutionConfig() {
          public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

          TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(

          • processFunction,
          • ProcessFunction.class,
          • false,
          • true,
          • getType(),
          • Utils.getCallLocationName(),
          • true);
            + processFunction,
            + ProcessFunction.class,
            + 0,
            + 1,
            + new int[] {0}

            ,

              • End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120397600 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java — @@ -574,13 +574,15 @@ public ExecutionConfig getExecutionConfig() { public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( processFunction, ProcessFunction.class, false, true, getType(), Utils.getCallLocationName(), true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[] {0} , End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120397513

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -254,9 +274,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          public <R> SingleOutputStreamOperator<R> process(
          CoProcessFunction<IN1, IN2, R> coProcessFunction) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
          • CoProcessFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coProcessFunction,
            + CoProcessFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120397513 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -254,9 +274,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, CoProcessFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coProcessFunction, + CoProcessFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120396063

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input,
          TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
          aggFunction, input.getType(), null, false);

          • TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
          • windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
            + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);

          return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
          }

          + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
          + AllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + AllWindowFunction.class,
          + 0,
          + 1,
          + new int[]

          {1, 0},
          + new int[]{2, 0},
          + inType,
          + null,
          + false);
          + }
          +
          + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
          + ProcessAllWindowFunction<IN, OUT, ?> function,
          + TypeInformation<IN> inType) {
          + return TypeExtractor.getUnaryOperatorReturnType(
          + function,
          + ProcessAllWindowFunction.class,
          + 0,
          + 1,
          + new int[]{1, 0}

          ,
          — End diff –

          `ProcessAllWindowFunction` is not a SAM. We do not need this parameters.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120396063 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -507,12 +505,41 @@ public AllWindowedStream(DataStream<T> input, TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType( + AllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[] {1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction<IN, OUT, ?> function, + TypeInformation<IN> inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0} , — End diff – `ProcessAllWindowFunction` is not a SAM. We do not need this parameters.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120398290

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java —
          @@ -221,14 +221,18 @@ protected WithWindow(DataStream<T1> input1,
          */
          public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • JoinFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "Join",
          • false);
            + function,
            + JoinFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0, 0}

            ,

              • End diff –

          Wrong lambda arguments. Should be `(0), (0), ()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120398290 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java — @@ -221,14 +221,18 @@ protected WithWindow(DataStream<T1> input1, */ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, true, true, input1.getType(), input2.getType(), "Join", false); + function, + JoinFunction.class, + 0, + 1, + 2, + new int[] {0, 0} , End diff – Wrong lambda arguments. Should be `(0), (0), ()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120397178

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java —
          @@ -203,9 +203,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
          */
          public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {

          • TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
          • CoMapFunction.class, false, true, getType1(), getType2(),
          • Utils.getCallLocationName(), true);
            + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            + coMapper,
            + CoMapFunction.class,
            + 0,
            + 1,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          No lambda.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120397178 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java — @@ -203,9 +203,19 @@ public StreamExecutionEnvironment getExecutionEnvironment() { */ public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, CoMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coMapper, + CoMapFunction.class, + 0, + 1, + 2, + new int[] {0} , End diff – No lambda.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120397022

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java —
          @@ -231,14 +231,18 @@ protected WithWindow(DataStream<T1> input1,
          public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

          TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(

          • function,
          • CoGroupFunction.class,
          • true,
          • true,
          • input1.getType(),
          • input2.getType(),
          • "CoGroup",
          • false);
            + function,
              • End diff –

          We could use `TypeExtractor#getCoGroupReturnTypes()` and save some duplicate code, but I'm also fine with this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120397022 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java — @@ -231,14 +231,18 @@ protected WithWindow(DataStream<T1> input1, public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, CoGroupFunction.class, true, true, input1.getType(), input2.getType(), "CoGroup", false); + function, End diff – We could use `TypeExtractor#getCoGroupReturnTypes()` and save some duplicate code, but I'm also fine with this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120389999

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -336,8 +461,8 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) {
          return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null);
          — End diff –

          Can you also update the `Partitioner` to support lambdas?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120389999 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -336,8 +461,8 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) { return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); — End diff – Can you also update the `Partitioner` to support lambdas?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120384026

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -216,7 +253,15 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          boolean allowMissing)
          {
          return getUnaryOperatorReturnType(

          • function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
            + function,
            + AggregateFunction.class,
            + 0,
            + 2,
            + new int[] {0}

            ,

              • End diff –

          I think this array is not necessary, because the `AggregateFunction` is not a SAM. Maybe you can change `NO_OUTPUT_INDEX` to `NO_INDEX` and also use it here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120384026 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -216,7 +253,15 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac boolean allowMissing) { return getUnaryOperatorReturnType( function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 2, + new int[] {0} , End diff – I think this array is not necessary, because the `AggregateFunction` is not a SAM. Maybe you can change `NO_OUTPUT_INDEX` to `NO_INDEX` and also use it here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120392260

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          }
          }

          • /**
          • * Returns the binary operator's return type.
          • *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterables True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
          • * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN1> Left side input type
          • * @param <IN2> Right side input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterables,
          • boolean hasCollector,
          • TypeInformation<IN1> in1Type,
          • TypeInformation<IN2> in2Type,
          • String functionName,
          • boolean allowMissing) {
            + private static Type extractType(
            + LambdaExecutable exec,
            + int[] lambdaTypeArgumentIndices,
            + int paramLen,
            + int baseParametersLen)
            Unknown macro: { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + }
          • return getBinaryOperatorReturnType(
          • function,
          • baseClass,
          • hasIterables ? 0 : -1,
          • hasCollector ? 0 : -1,
          • in1Type,
          • in2Type,
          • functionName,
          • allowMissing
          • );
            + private static Method getSingleAbstractMethod(Class<?> baseClass) {
              • End diff –

          Can you document and move this to TypeExtractionUtils?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120392260 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac } } /** * Returns the binary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterables True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN1> Left side input type * @param <IN2> Right side input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { + private static Type extractType( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) Unknown macro: { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } return getBinaryOperatorReturnType( function, baseClass, hasIterables ? 0 : -1, hasCollector ? 0 : -1, in1Type, in2Type, functionName, allowMissing ); + private static Method getSingleAbstractMethod(Class<?> baseClass) { End diff – Can you document and move this to TypeExtractionUtils?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120389039

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          throw new InvalidTypesException("Internal error occurred.", e);
          }
          if (exec != null) {
          + Preconditions.checkArgument(
          + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
          + "Indices for input type arguments within lambda not provided");
          + Preconditions.checkArgument(
          + lambdaOutputTypeArgumentIndices != null,
          + "Indices for output type arguments within lambda not provided");
          // check for lambda type erasure
          validateLambdaGenericParameters(exec);

          // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function

          • final int paramLen = exec.getParameterTypes().length - 1;
            + final int paramLen = exec.getParameterTypes().length;
            +
            + final Method sam = getSingleAbstractMethod(baseClass);
            + final int baseParametersLen = sam.getParameterTypes().length;
              • End diff –

          Can you add an inline comment to both `paramLen` and `baseParametersLen`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120389039 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; End diff – Can you add an inline comment to both `paramLen` and `baseParametersLen`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120387120

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -354,49 +479,21 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          /**

          • Returns the unary operator's return type.
            *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterable True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param inType Type of the input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN> Input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterable,
          • boolean hasCollector,
          • TypeInformation<IN> inType,
          • String functionName,
          • boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, - inType, - functionName, - allowMissing); - }

            -

          • /**
          • * Returns the unary operator's return type.
            + * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN</b>
              • End diff –

          allow

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120387120 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -354,49 +479,21 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac /** Returns the unary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterable True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param inType Type of the input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN> Input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType, String functionName, boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, - inType, - functionName, - allowMissing); - } - /** * Returns the unary operator's return type. + * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN</b> End diff – allow
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r120391605

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          }
          }

          • /**
          • * Returns the binary operator's return type.
          • *
          • * @param function Function to extract the return type from
          • * @param baseClass Base class of the function
          • * @param hasIterables True if the first function parameter is an iterable, otherwise false
          • * @param hasCollector True if the function has an additional collector parameter, otherwise false
          • * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
          • * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
          • * @param functionName Function name
          • * @param allowMissing Can the type information be missing
          • * @param <IN1> Left side input type
          • * @param <IN2> Right side input type
          • * @param <OUT> Output type
          • * @return TypeInformation of the return type of the function
          • */
          • @SuppressWarnings("unchecked")
          • @PublicEvolving
          • public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
          • Function function,
          • Class<?> baseClass,
          • boolean hasIterables,
          • boolean hasCollector,
          • TypeInformation<IN1> in1Type,
          • TypeInformation<IN2> in2Type,
          • String functionName,
          • boolean allowMissing) {
            + private static Type extractType(
              • End diff –

          Please give this method a more meaningful name (e.g. extractTypeFromLambda) and a Javadoc. You can also move it to `TypeExtractionUtils`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r120391605 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -464,57 +578,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac } } /** * Returns the binary operator's return type. * * @param function Function to extract the return type from * @param baseClass Base class of the function * @param hasIterables True if the first function parameter is an iterable, otherwise false * @param hasCollector True if the function has an additional collector parameter, otherwise false * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name * @param allowMissing Can the type information be missing * @param <IN1> Left side input type * @param <IN2> Right side input type * @param <OUT> Output type * @return TypeInformation of the return type of the function */ @SuppressWarnings("unchecked") @PublicEvolving public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType( Function function, Class<?> baseClass, boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing) { + private static Type extractType( End diff – Please give this method a more meaningful name (e.g. extractTypeFromLambda) and a Javadoc. You can also move it to `TypeExtractionUtils`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          Now, let's wait for @twalthr in case he has any comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 Now, let's wait for @twalthr in case he has any comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r119869362

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          throw new InvalidTypesException("Internal error occurred.", e);
          }
          if (exec != null) {
          + Preconditions.checkArgument(
          + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
          + "Indices for input type arguments within lambda not provided");
          + Preconditions.checkArgument(
          + lambdaOutputTypeArgumentIndices != null,
          + "Indices for output type arguments within lambda not provided");
          // check for lambda type erasure
          validateLambdaGenericParameters(exec);

          // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function

          • final int paramLen = exec.getParameterTypes().length - 1;
            + final int paramLen = exec.getParameterTypes().length;
            +
            + final Method sam = getSingleAbstractMethod(baseClass);
            + final int baseParametersLen = sam.getParameterTypes().length;
              • End diff –

          I see, thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r119869362 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; End diff – I see, thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r119868212

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          throw new InvalidTypesException("Internal error occurred.", e);
          }
          if (exec != null) {
          + Preconditions.checkArgument(
          + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
          + "Indices for input type arguments within lambda not provided");
          + Preconditions.checkArgument(
          + lambdaOutputTypeArgumentIndices != null,
          + "Indices for output type arguments within lambda not provided");
          // check for lambda type erasure
          validateLambdaGenericParameters(exec);

          // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function

          • final int paramLen = exec.getParameterTypes().length - 1;
            + final int paramLen = exec.getParameterTypes().length;
            +
            + final Method sam = getSingleAbstractMethod(baseClass);
            + final int baseParametersLen = sam.getParameterTypes().length;
              • End diff –

          Kind of. baseParametersLen is the number of arguments that the method of interface has. So this is the "array" that the indices point to.

          paramLen is the number of parameters of the lambda, which can have some additional parameters e.g. from the closure. So we need both of those lengths to index the argument correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r119868212 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; End diff – Kind of. baseParametersLen is the number of arguments that the method of interface has. So this is the "array" that the indices point to. paramLen is the number of parameters of the lambda, which can have some additional parameters e.g. from the closure. So we need both of those lengths to index the argument correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4039#discussion_r119851342

          — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java —
          @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
          throw new InvalidTypesException("Internal error occurred.", e);
          }
          if (exec != null) {
          + Preconditions.checkArgument(
          + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
          + "Indices for input type arguments within lambda not provided");
          + Preconditions.checkArgument(
          + lambdaOutputTypeArgumentIndices != null,
          + "Indices for output type arguments within lambda not provided");
          // check for lambda type erasure
          validateLambdaGenericParameters(exec);

          // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function

          • final int paramLen = exec.getParameterTypes().length - 1;
            + final int paramLen = exec.getParameterTypes().length;
            +
            + final Method sam = getSingleAbstractMethod(baseClass);
            + final int baseParametersLen = sam.getParameterTypes().length;
              • End diff –

          What is the difference between `paramLen` and `baseParametersLen`? Is it that the first is the number of generic parameters of the SAM itself and `baseParametersLen` is the number of parameters of the method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r119851342 — Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java — @@ -422,37 +521,52 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterTypes().length; End diff – What is the difference between `paramLen` and `baseParametersLen`? Is it that the first is the number of generic parameters of the SAM itself and `baseParametersLen` is the number of parameters of the method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          Thanks, and yes, I see my mistake now.

          I'll review, but could you please also have a look, @twalthr? I think you have more experience with this than I do.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 Thanks, and yes, I see my mistake now. I'll review, but could you please also have a look, @twalthr? I think you have more experience with this than I do.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          I think it is ready for a review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 I think it is ready for a review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          In fact `new int[]

          {0,1,0,0}

          `, but I think you got the point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 In fact `new int[] {0,1,0,0} `, but I think you got the point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          Ah, so it performs nested navigation. If I had a type like
          ```
          public interface Dummy<IN, OUT>

          { OUT apply(Map<String, Map<List<IN>, String>> value) }

          ```
          it would be `new int[]

          {0,1,0,1}

          `?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 Ah, so it performs nested navigation. If I had a type like ``` public interface Dummy<IN, OUT> { OUT apply(Map<String, Map<List<IN>, String>> value) } ``` it would be `new int[] {0,1,0,1} `?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          Sure, with the arrays you provide index of the input/output parameters in lambdas. The lambdaInputTypeArgumentIndices[0] is the argument index of lambda function. The lambdaInputTypeArgumentIndices[1] is the index within the argument, if the argument is a generic type and so on.

          So for example if we have a functional interface like:
          `
          public interface Dummy<IN, OUT>

          { OUT apply(Map<String, List<IN>> value) }

          `
          To extract inputType *IN* from lambda you need to provide array like `new int[]

          {0,1,0}

          `.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 Sure, with the arrays you provide index of the input/output parameters in lambdas. The lambdaInputTypeArgumentIndices [0] is the argument index of lambda function. The lambdaInputTypeArgumentIndices [1] is the index within the argument, if the argument is a generic type and so on. So for example if we have a functional interface like: ` public interface Dummy<IN, OUT> { OUT apply(Map<String, List<IN>> value) } ` To extract inputType * IN * from lambda you need to provide array like `new int[] {0,1,0} `.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4039

          I like the idea of explicitly passing the argument parameters a lot! I would even suggest to remove the old methods with `hasCollector` and so on instead of just deprecating.

          Could you please elaborate a bit how the lambda argument arrays work?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4039 I like the idea of explicitly passing the argument parameters a lot! I would even suggest to remove the old methods with `hasCollector` and so on instead of just deprecating. Could you please elaborate a bit how the lambda argument arrays work?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4039

          The PR is not fully ready yet, but as it touches critical part, I would like to hear early opinions from somebody more experienced with the Type system. Would be good to know if I am going the right way

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4039 The PR is not fully ready yet, but as it touches critical part, I would like to hear early opinions from somebody more experienced with the Type system. Would be good to know if I am going the right way
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dawidwys opened a pull request:

          https://github.com/apache/flink/pull/4039

          FLINK-6783 Changed passing index of type argument while extracting …

          …return type.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dawidwys/flink flink-6783

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4039.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4039


          commit cb20120ccbaac1ebf3c2f8aafd4d3e82e5fe9fc1
          Author: Dawid Wysakowicz <dawid@getindata.com>
          Date: 2017-06-01T11:17:25Z

          FLINK-6783 Changed passing index of type argument while extracting return type.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4039 FLINK-6783 Changed passing index of type argument while extracting … …return type. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink flink-6783 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4039 commit cb20120ccbaac1ebf3c2f8aafd4d3e82e5fe9fc1 Author: Dawid Wysakowicz <dawid@getindata.com> Date: 2017-06-01T11:17:25Z FLINK-6783 Changed passing index of type argument while extracting return type.
          Hide
          dawidwys Dawid Wysakowicz added a comment -

          Sure Aljoscha Krettek, will do!

          Show
          dawidwys Dawid Wysakowicz added a comment - Sure Aljoscha Krettek , will do!
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Dawid Wysakowicz I think you could also change the DummyAggregationFunction used in the tests to have different types for IN, ACC, and OUT when fixing this. What do you think?

          Show
          aljoscha Aljoscha Krettek added a comment - Dawid Wysakowicz I think you could also change the DummyAggregationFunction used in the tests to have different types for IN , ACC , and OUT when fixing this. What do you think?
          Hide
          twalthr Timo Walther added a comment -

          +1 for this change.

          Show
          twalthr Timo Walther added a comment - +1 for this change.

            People

            • Assignee:
              dawidwys Dawid Wysakowicz
              Reporter:
              dawidwys Dawid Wysakowicz
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development