Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12113

User code passing to fromCollection(Iterator, Class) not cleaned

    XMLWordPrintableJSON

Details

    Description

       

      interface IS<E> extends Iterator<E>, Serializable { }
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.fromCollection(new IS<Object>() {
          @Override
          public boolean hasNext() {
              return false;
          }
      
          @Override
          public Object next() {
              return null;
          }
      }, Object.class);
      

      Code piece above throws exception:

      org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields.
      
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
      ....

      And my workaround is wrapping clean around iterator instance, like this:

       

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.fromCollection(env.clean(new IS<Object>() {
          @Override
          public boolean hasNext() {
              return false;
          }
      
          @Override
          public Object next() {
              return null;
          }
      }), Object.class);
      

       

       

       

      Attachments

        1. image-2019-04-07-21-52-37-264.png
          419 kB
          Guowei Ma
        2. image-2019-04-08-23-19-27-359.png
          432 kB
          Guowei Ma

        Activity

          People

            Unassigned Unassigned
            vision57 yankai zhang
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: