Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18006

It will throw Invalid lambda deserialization Exception when writing to elastic search with new format

    XMLWordPrintableJSON

Details

    Description

      My job follows:

      // 
      create table csv( pageId VARCHAR, eventId VARCHAR, recvTime VARCHAR) with ( 'connector' = 'filesystem',
       'path' = '/Users/ohmeatball/Work/flink-sql-etl/data-generator/src/main/resources/user3.csv',
       'format' = 'csv'
       )
      -----------------------------------------
      CREATE TABLE es_table (
        aggId varchar ,
        pageId varchar ,
        ts varchar ,
        expoCnt int ,
        clkCnt int
      ) WITH (
      'connector' = 'elasticsearch',
      'hosts' = 'http://localhost:9200',
      'index' = 'cli_test',
      'document-id.key-delimiter' = '$',
      'sink.bulk-flush.interval' = '1000',
      'format' = 'json'
      )
      -----------------------------------------
      INSERT INTO es_table
      SELECT  pageId,eventId,cast(recvTime as varchar) as ts, 1, 1 from csv;
      

      The full exception follows:

      Sink(table=[default_catalog.default_database.es_table], fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@bb5ab41.Sink(table=[default_catalog.default_database.es_table], fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@bb5ab41.org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:518) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:545) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
      Caused by: java.lang.reflect.InvocationTargetExceptionCaused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 moreCaused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) ~[flink-sql-connector-elasticsearch7_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151]
       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
      

      Notice: everything works fine with former connector grammer.

      Attachments

        Issue Links

          Activity

            People

              dwysakowicz Dawid Wysakowicz
              fsk119 Shengkai Fang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: