Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17946

The config option "pipeline.jars" doesn't work if the job was executed via TableEnvironment.execute_sql and StatementSet.execute

    XMLWordPrintableJSON

Details

    Description

      For the following job:

      import logging
      import sys
      import tempfile
      
      from pyflink.table import BatchTableEnvironment, EnvironmentSettings
      
      
      def word_count():
          content = "line Licensed to the Apache Software Foundation ASF under one " \
                    "line or more contributor license agreements See the NOTICE file " \
                    "line distributed with this work for additional information " \
                    "line regarding copyright ownership The ASF licenses this file " \
                    "to you under the Apache License Version the " \
                    "License you may not use this file except in compliance " \
                    "with the License"
      
          environment_settings = EnvironmentSettings.new_instance().in_batch_mode().\
              use_blink_planner().build()
          t_env = BatchTableEnvironment.create(environment_settings=environment_settings)
          t_env.get_config().get_configuration().set_string(
              "pipeline.jars",
              "file:///Users/dianfu/workspace/wordcount_python/flink-csv-1.11.0-sql-jar.jar")
      
          # register Results table in table environment
          tmp_dir = tempfile.gettempdir()
          result_path = tmp_dir + '/result'
      
          logging.info("Results directory: %s", result_path)
      
          sink_ddl = """
              create table Results(
                  word VARCHAR,
                  `count` BIGINT
              ) with (
                  'connector' = 'filesystem',
                  'format' = 'csv',
                  'path' = '{}'
              )
              """.format(result_path)
          t_env.execute_sql(sink_ddl)
      
          elements = [(word, 1) for word in content.split(" ")]
          table = t_env.from_elements(elements, ["word", "count"]) \
              .group_by("word") \
              .select("word, count(1) as count")
      
          statement_set = t_env.create_statement_set()
          statement_set.add_insert("Results", table, overwrite=True)
          statement_set.execute()
      
      
      if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
      
          word_count()
      

      It will throw exceptions as following:

      Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.table.filesystem.FileSystemOutputFormat.formatFactory of type org.apache.flink.table.filesystem.OutputFormatFactory in instance of org.apache.flink.table.filesystem.FileSystemOutputFormat
      	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
      	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
      	at java.util.HashMap.readObject(HashMap.java:1404)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
      	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
      	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
      	at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
      	... 23 more
      

      The reason is that the "pipeline.jars" option is not handled properly in StatementSet.execute and so it cannot find the jar specified via "pipeline.jars". This issue also exists in TableEnvironment.execute_sql

      Attachments

        Issue Links

          Activity

            People

              zhongwei Wei Zhong
              dian.fu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: