Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3138

Method References are not supported as lambda expressions

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.2
    • Fix Version/s: 1.0.0, 1.2.0
    • Component/s: Core
    • Labels:
      None

      Description

      For many functions (here for example KeySelectors), one can use lambda expressions:

      DataStream<MyType> stream = ...;
      stream.keyBy( v -> v.getId() )
      

      Java's other syntax for this, Method References, do not work:

      DataStream<MyType> stream = ...;
      stream.keyBy( MyType::getId )
      

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          How does this manifest?

          I changed the WordCount (streaming) example in flink-java8 to this and it still works:

          public class WordCount {
          	
          	// *************************************************************************
          	//     PROGRAM
          	// *************************************************************************
          
          	public static String keyIt(Tuple2<String, Integer> e) {
          		return e.f0;
          	}
          
          	public static void main(String[] args) throws Exception {
          		
          		if(!parseParameters(args)) {
          			return;
          		}
          		
          		// set up the execution environment
          		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          		
          		// get input data
          		DataStream<String> text = getTextDataStream(env);
          		
          		DataStream<Tuple2<String, Integer>> counts = 
          				// normalize and split each line
          				text.map(line -> line.toLowerCase().split("\\W+")).returns(String[].class)
          				// convert splitted line in pairs (2-tuples) containing: (word,1)
          				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
          					// emit the pairs with non-zero-length words
          					Arrays.stream(tokens)
          					.filter(t -> t.length() > 0)
          					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
          				}).returns("Tuple2<String, Integer>")
          				// group by the tuple field "0" and sum up tuple field "1"
          				.keyBy(WordCount::keyIt)
          				.sum(1);
          
          		// emit result
          		if(fileOutput) {
          			counts.writeAsCsv(outputPath, 1);
          		} else {
          			counts.print();
          		}
          		
          		// execute program
          		env.execute("Streaming WordCount Example");
          	}
          	
          	// *************************************************************************
          	//     UTIL METHODS
          	// *************************************************************************
          	
          	private static boolean fileOutput = false;
          	private static String textPath;
          	private static String outputPath;
          	
          	private static boolean parseParameters(String[] args) {
          		
          		if(args.length > 0) {
          			// parse input arguments
          			fileOutput = true;
          			if(args.length == 2) {
          				textPath = args[0];
          				outputPath = args[1];
          			} else {
          				System.err.println("Usage: WordCount <text path> <result path>");
          				return false;
          			}
          		} else {
          			System.out.println("Executing WordCount example with built-in default data.");
          			System.out.println("  Provide parameters to read input data from a file.");
          			System.out.println("  Usage: WordCount <text path> <result path>");
          		}
          		return true;
          	}
          	
          	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
          		if (fileOutput) {
          			// read the text file from given input path
          			return env.readTextFile(textPath);
          		} else {
          			// get default test text data
          			return env.fromElements(WordCountData.WORDS);
          		}
          	}
          }
          
          Show
          aljoscha Aljoscha Krettek added a comment - How does this manifest? I changed the WordCount (streaming) example in flink-java8 to this and it still works: public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static String keyIt(Tuple2< String , Integer > e) { return e.f0; } public static void main( String [] args) throws Exception { if (!parseParameters(args)) { return ; } // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream< String > text = getTextDataStream(env); DataStream<Tuple2< String , Integer >> counts = // normalize and split each line text.map(line -> line.toLowerCase().split( "\\W+" )).returns( String [].class) // convert splitted line in pairs (2-tuples) containing: (word,1) .flatMap(( String [] tokens, Collector<Tuple2< String , Integer >> out) -> { // emit the pairs with non-zero-length words Arrays.stream(tokens) .filter(t -> t.length() > 0) .forEach(t -> out.collect( new Tuple2<>(t, 1))); }).returns( "Tuple2< String , Integer >" ) // group by the tuple field "0" and sum up tuple field "1" .keyBy(WordCount::keyIt) .sum(1); // emit result if (fileOutput) { counts.writeAsCsv(outputPath, 1); } else { counts.print(); } // execute program env.execute( "Streaming WordCount Example" ); } // ************************************************************************* // UTIL METHODS // ************************************************************************* private static boolean fileOutput = false ; private static String textPath; private static String outputPath; private static boolean parseParameters( String [] args) { if (args.length > 0) { // parse input arguments fileOutput = true ; if (args.length == 2) { textPath = args[0]; outputPath = args[1]; } else { System .err.println( "Usage: WordCount <text path> <result path>" ); return false ; } } else { System .out.println( "Executing WordCount example with built-in default data." ); System .out.println( " Provide parameters to read input data from a file." ); System .out.println( " Usage: WordCount <text path> <result path>" ); } return true ; } private static DataStream< String > getTextDataStream(StreamExecutionEnvironment env) { if (fileOutput) { // read the text file from given input path return env.readTextFile(textPath); } else { // get default test text data return env.fromElements(WordCountData.WORDS); } } }
          Hide
          StephanEwen Stephan Ewen added a comment -

          Try using a custom type with a getKey method(). Then the referenced method is non-static and parameterless (the one parameter is then implicitly this).

          Show
          StephanEwen Stephan Ewen added a comment - Try using a custom type with a getKey method(). Then the referenced method is non-static and parameterless (the one parameter is then implicitly this ).
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/2329

          FLINK-3138 [types] Method References are not supported as lambda expressions

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          This PR fixes a small bug in the TypeExtractor and adds more tests to verify Java 8 method references.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-3138

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2329.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2329


          commit c7ed2239022b8ea1b1322054d6a9d4c329034a83
          Author: twalthr <twalthr@apache.org>
          Date: 2016-08-03T12:18:40Z

          FLINK-3138 [types] Method References are not supported as lambda expressions


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2329 FLINK-3138 [types] Method References are not supported as lambda expressions Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR fixes a small bug in the TypeExtractor and adds more tests to verify Java 8 method references. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3138 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2329 commit c7ed2239022b8ea1b1322054d6a9d4c329034a83 Author: twalthr <twalthr@apache.org> Date: 2016-08-03T12:18:40Z FLINK-3138 [types] Method References are not supported as lambda expressions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2329

          Looks good to me. Has nice tests and the Travis CI failures are cache/jar file issues, i.e., unrelated.

          +1 to merge this

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2329 Looks good to me. Has nice tests and the Travis CI failures are cache/jar file issues, i.e., unrelated. +1 to merge this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2329

          Thanks Stephan. Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2329 Thanks Stephan. Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2329

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2329
          Hide
          twalthr Timo Walther added a comment -

          Fixed in ff777084ba2e1f1070ce5ecbc5afc122756ba851.

          Show
          twalthr Timo Walther added a comment - Fixed in ff777084ba2e1f1070ce5ecbc5afc122756ba851.

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development