Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22247

can not pass AddressList when connecting to rabbitmq

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.12.2
    • Fix Version/s: None
    • Component/s: API / DataStream
    • Environment:

      flink: 1.12.2

      rabbitmq: 3.8.4

      Description

      We hope to connect to rabbitmq cluster address when using rabbitmq connector, So we override the setupConnection function to pass the rabbitmq cluster address, but the address class is not serializable thereby flink throws exception.

      //代码占位符
      val rabbitmqAddresses = Array(
        new Address("xxx1", 5672),
        new Address("xxx2", 5672),
        new Address("xxx3", 5672))
      
      val dataStream = streamEnv
        .addSource(new RMQSource[String](
          rabbitmqConfig, // rabbitmq's connection config
          "queueName", // queue name
          true, // using correlation ids, assurance of exactly-once consume from rabbitmq
          new SimpleStringSchema // java deserialization
        ) {
          override def setupQueue(): Unit = {}
      
          override def setupConnection(): Connection = {
            rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
          }
        }).setParallelism(1)
      

      Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object probably contains or references non serializable fields.Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693) at testConsumer$.main(testConsumer.scala:30) at testConsumer.main(testConsumer.scala)Caused by: java.io.NotSerializableException: com.rabbitmq.client.Address at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 9 more

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              SpongebobZ Spongebob
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: