Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-5877

IllegalArgumentException when running pyflink interpreter with user jars



    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.1
    • None
    • zeppelin-interpreter
    • None


      Calling some Java constructors and methods from a user provided jar through "flink.execution.jars" causes "java.lang.IllegalArgumentException: argument type mismatch" when the call includes Enums or static Fields. 

      An example paragraph: 

      from pyflink.java_gateway import get_gateway
      gateway = get_gateway()
      jvm = gateway.jvm
      myInstance = jvm.org.example.MyClass()
      param = jvm.org.example.ParameterContainer.staticField
      #either one of these two lines will throw IllegalArgumentException, on the second run of the interpreter


      java.lang.IllegalArgumentException: argument type mismatch
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
          at py4j.Gateway.invoke(Gateway.java:282)
          at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
          at py4j.commands.CallCommand.execute(CallCommand.java:79)
          at py4j.GatewayConnection.run(GatewayConnection.java:238)
          at java.lang.Thread.run(Thread.java:750)

      A zip containing the class definitions was attached, and it can be used to build the jar and reproduce

      The error will not occur on the first run of the paragraph, but on all following runs given the same code. When the Flink interpreter is restarted, we get another one successful run before the error appears again.

      We first encountered when using the pyflink Kafka connector:  the paragraph below will produce the same error.It requires flink-connector-kafka_2.12 in flink.execution.jars to run.


      from pyflink.datastream.connectors import FlinkKafkaProducer 
      from pyflink.common.serialization import SimpleStringSchema
      # the below will eventually call a Java Constructor with an enum Parameter: 
      # https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.html#FlinkKafkaProducer-java.lang.String-org.apache.flink.api.common.serialization.SerializationSchema-java.util.Properties-org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner-org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic-int- 
      producer = FlinkKafkaProducer("output-topic", SimpleStringSchema(), {"bootstrap.servers": "localhost:9092"})  


      I did some analysis and concluded  that this bug is the result of a combination of two factors: 
      1- Py4j caches method and constructor invokers
      2- zeppelin creates a new class loader on every paragraph run.

      When a paragraph is run, the user provided java classes are loaded when needed by the new class loader. When invoking a method or a constructor however, Py4j will use the cached invokers with old class handles from the class loader of the very first run, so the callee and parameters might have an old class handle. Static fields and Enums however, are not cached, but a new "Object" is created and stored by Py4j every time they are accessed. so an Enum parameter loaded by the new class loader, passed as parameter to a callee object loaded by an old class loder, will cause the argument mismatch error.



        1. pyflink-test-jar.zip
          12 kB
          Mohammad Kamar



            Unassigned Unassigned
            mkamar Mohammad Kamar
            0 Vote for this issue
            1 Start watching this issue