Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23360

Facing broker disconnected issue while establishing Kafka with Azure Event hubs in Spring boot application #543

    XMLWordPrintableJSON

    Details

      Description

      1. Article link I followed to configure Event hub.
        https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs (TLS/SSL)
      1. code
        Configurations in .properties file

      ###############################################################################################################

       

      Properties For Azure Event Hub with Kafka

      ###############################################################################################################

      spring.kafka.bootstrap.servers=*****.servicebus.windows.net:9093
      spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://*******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************";
      spring.kafka.properties.security.protocol=SASL_SSL
      spring.kafka.properties.sasl.mechanism=PLAIN
       {{ POM File : }}

      4.0.0

      ******-processor
      0.0.1-SNAPSHOT

      ******-common
      jar

      <project.root.directory>${basedir}/..</project.root.directory>

      com..iot
      *****-processor-data

      org.apache.kafka
      kafka-clients
      0.11.0.0

      org.apache.commons
      commons-lang3
      3.9

      org.apache.commons
      commons-io
      1.3.2

      commons-configuration
      commons-configuration
      1.10

      Kafka producer Configurations :

      @Bean    public ProducerFactory<String, byte[]> postDecoderFactory() throws FileNotFoundException { logger.info("BootStrap Server "); logger.info(new CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers")); Map<String, Object> configProps = new HashMap();   configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));   //configProps.put(ProducerConfig.ACKS_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerAcks"));   //configProps.put(ProducerConfig.RETRIES_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerRetries"));   //configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerBatchSize"));   //configProps.put(ProducerConfig.LINGER_MS_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerLingerMs"));   //configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));   configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));   configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new CommonUtility().getPropertyValue("byteMessageSerializer"));   //configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, new CommonUtility().getPropertyValue("kafkaProducerCompressionType"));          return new DefaultKafkaProducerFactory<>(configProps);    }
          @Bean    public KafkaTemplate<String, byte[]> byteArrayKafkaTemplate() throws FileNotFoundException {       return new KafkaTemplate<>(postDecoderFactory());    }

       

      Error :

      2021-07-12 16:09:55.111 WARN 13288 — ntainer#2-0-C-1 org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-m2m-index-preprocessor-topic-cg-1, groupId=m2m-index-preprocessor-topic-cg] Bootstrap broker *******.servicebus.windows.net:9093 (id: -1 rack: null) disconnected

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              rajiya_mul Rajiya Mulani
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: