Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10682

Windows Kafka cluster not reachable via Azure data Bricks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.6.0
    • None
    • consumer
    • None

    Description

      We have windows Kafka cluster,

      • We enabled inbound and outbound for port 9092/9093
      • Topic return results on local windows cmd used
        • ./kafka-console-consumer.bat --topic SIP.SIP.SHIPMENT --from-beginning --bootstrap-server 10.53.56.140:9092
      • We trying to consume the topic from Azure data bricks
        • Simple ping and telnet works fine and connects to underlying server 
          • %sh telnet 10.53.56.140 9092
          • %sh ping 10.53.56.140
        • df = spark \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "10.53.56.140:9092") \
          .option("subscribe", "SIP.SIP.SHIPMENT") \
          .option("minPartitions", "10") \
          .option("startingOffsets", "earliest") \
          .load()
          #df.isStreaming() # Returns True for DataFrames that have streaming sources

      df.printSchema()

        • Display(df)

      On using display command after before amount of time we got below error:

      Lost connection to cluster. The notebook may have been detached or the cluster may have been terminated due to an error in the driver such as an OutOfMemoryError.

        What we see in Logs is below error

      20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)20/11/04 18:23:52 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0-5, groupId=spark-kafka-source-515ba67c-f265-4577-935b-5c7ba954a31d--1012371861-driver-0] Error connecting to node Navin.us.corp.tim.com:9092 (id: 0 rack: null)java.net.UnknownHostException: Navin.us.corp.tim.com at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at kafkashaded.org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at kafkashaded.org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at kafkashaded.org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949) at kafkashaded.org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) at kafkashaded.org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) at kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at kafkashaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240) at kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:540) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:602) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:601) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:569) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:300) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:398) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:398) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:391) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:388) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:619) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:384) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:216) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:276) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:274) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:71) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sneevand navin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: