Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-1255

java.io.NotSerializableException in flink on UnboundedSource

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 0.5.0
    • 0.5.0
    • runner-flink
    • None

    Description

      After introduce new Coders with TypeDescriptor on flink runner we have issue:

      Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl
      	- element of array (index: 0)
      	- array (class "[Ljava.lang.Object;", size: 2)
      	- field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", type: "class [Ljava.lang.Object;")
      	- object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
      	- field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList", type: "class com.google.common.collect.ImmutableList")
      	- object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
      	- field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface java.lang.reflect.Type")
      	- object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
      	- field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class com.google.common.reflect.TypeToken")
      	- object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
      	- field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor")
      	- object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder)
      	- field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface org.apache.beam.sdk.coders.Coder")
      	- object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder))
      	- field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder")
      	- object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder)))
      	- field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
      	- root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
      

      bug introduced after commit:
      7b98fa08d14e8121e8885f00a9a9a878b73f81a6

      pull request:
      https://github.com/apache/beam/pull/1537

      Code for reproduce error

      import com.google.common.collect.ImmutableList;
      import org.apache.beam.runners.flink.FlinkPipelineOptions;
      import org.apache.beam.runners.flink.FlinkRunner;
      import org.apache.beam.sdk.Pipeline;
      import org.apache.beam.sdk.io.kafka.KafkaIO;
      import org.apache.beam.sdk.options.PipelineOptionsFactory;
      
      public class FlinkSerialisationError {
      
          public static void main(String[] args) {
              FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
              options.setRunner(FlinkRunner.class);
              options.setStreaming(true);
      
      
              Pipeline pipeline = Pipeline.create(options);
      
              pipeline.apply(
                      KafkaIO.read()
                          .withBootstrapServers("localhost:9092")
                          .withTopics(ImmutableList.of("test"))
                          // set ConsumerGroup
                          .withoutMetadata());
      
              pipeline.run();
          }
      }
      

      Attachments

        Issue Links

          Activity

            People

              humanoid Alexey Diomin
              humanoid Alexey Diomin
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: