Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30522

Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.2
    • None
    • Java API

    Description

      I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job.I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job.
      The first executor id always takes the parameters I have provided while creating the streaming context but the executor with ID 2 and 3 always override the kafka parameters.
         

      20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sour    ces like Flume. See the programming guide for details on how to enable the Write Ahead Log.    
      20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata    
      20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
      20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint interval = null   
       20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
      20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
      20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
      20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated    20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
      20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
      20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
      20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             auto.commit.interval.ms = 5000            
      auto.offset.reset = latest            
      bootstrap.servers = [1,2,3]            
      check.crcs = true            
      client.id = client-0            
      connections.max.idle.ms = 540000            
      default.api.timeout.ms = 60000            
      enable.auto.commit = false            
      exclude.internal.topics = true            
      fetch.max.bytes = 52428800            
      fetch.max.wait.ms = 500            
      fetch.min.bytes = 1            
      group.id = telemetry-streaming-service            
      heartbeat.interval.ms = 3000            
      interceptor.classes = []            
      internal.leave.group.on.close = true            
      isolation.level = read_uncommitted            
      key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
       
      

      Here is the log for other executors.
         

       20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
      20/01/14 12:15:04 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
      20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
      20/01/14 12:15:04 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
      20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
      20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
      20/01/14 12:15:04 INFO BlockManager: Registering executor with local external shuffle service.    
      20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)    
      20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
      20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
      20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
      20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0    
      20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps)    
      20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 8.1 KB, free 6.2 GB)    
      20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 17.9 KB, free 6.2 GB)    
      20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 offsets 237352170 -> 237352311    20/01/14 12:15:20 INFO CachedKafkaConsumer: Initializing cache 16 64 0.75    20/01/14 12:15:20 INFO CachedKafkaConsumer: Cache miss for CacheKey(spark-executor-telemetry-streaming-service,telemetry,1)    
      20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values:             auto.commit.interval.ms = 5000            
      auto.offset.reset = none            
      bootstrap.servers = [1,2,3]            
      check.crcs = true            
      client.id = client-0            
      connections.max.idle.ms = 540000            
      default.api.timeout.ms = 60000            
      enable.auto.commit = false            
      exclude.internal.topics = true            
      fetch.max.bytes = 52428800            
      fetch.max.wait.ms = 500
      
      

       

      If we closely observer in the first executor the *auto.offset.reset is latest* but for the other executors the *auto.offset.reset = none*

       

      Here is how I am creating the streaming context
       

      // code placeholderpublic void init() throws Exception {
      
              final String BOOTSTRAP_SERVERS = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.kafka.broker.list");
              final String DYNAMIC_ALLOCATION_ENABLED = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.enabled");
              final String DYNAMIC_ALLOCATION_SCALING_INTERVAL = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.scalingInterval");
              final String DYNAMIC_ALLOCATION_MIN_EXECUTORS = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.minExecutors");
              final String DYNAMIC_ALLOCATION_MAX_EXECUTORS = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.maxExecutors");
              final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");
              final String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");
              final String SPARK_SHUFFLE_SERVICE_ENABLED = PropertyFileReader.getInstance()
                      .getProperty("spark.shuffle.service.enabled");
              final String SPARK_LOCALITY_WAIT = PropertyFileReader.getInstance().getProperty("spark.locality.wait");
              final String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.kafka.consumer.poll.ms");
              final String SPARK_KAFKA_MAX_RATE_PER_PARTITION = PropertyFileReader.getInstance()
                      .getProperty("spark.streaming.kafka.maxRatePerPartition");
              final String SPARK_BATCH_DURATION_IN_SECONDS = PropertyFileReader.getInstance()
                      .getProperty("spark.batch.duration.in.seconds");
              final String KAFKA_TOPIC = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic");
      
              LOGGER.debug("connecting to brokers ::" + BOOTSTRAP_SERVERS);
              LOGGER.debug("bootstrapping properties to create consumer");
      
              kafkaParams = new HashMap<>();
              kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
              kafkaParams.put("key.deserializer", StringDeserializer.class);
              kafkaParams.put("value.deserializer", StringDeserializer.class);
              kafkaParams.put("group.id", "telemetry-streaming-service");
              kafkaParams.put("auto.offset.reset", "latest");
              kafkaParams.put("enable.auto.commit", false);
              kafkaParams.put("client.id", "client-0");
              // Below property should be enabled in properties and changed based on
              // performance testing
              kafkaParams.put("max.poll.records",
                      PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records"));
      
              LOGGER.info("registering as a consumer with the topic :: " + KAFKA_TOPIC);
              topics = Arrays.asList(KAFKA_TOPIC);
              sparkConf = new SparkConf()
      //                .setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url"))
                      .setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name"))
                      .set("spark.streaming.dynamicAllocation.enabled", DYNAMIC_ALLOCATION_ENABLED)
                      .set("spark.streaming.dynamicAllocation.scalingInterval", DYNAMIC_ALLOCATION_SCALING_INTERVAL)
                      .set("spark.streaming.dynamicAllocation.minExecutors", DYNAMIC_ALLOCATION_MIN_EXECUTORS)
                      .set("spark.streaming.dynamicAllocation.maxExecutors", DYNAMIC_ALLOCATION_MAX_EXECUTORS)
                      .set("spark.streaming.dynamicAllocation.executorIdleTimeout", DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
                      .set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",
                              DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
                      .set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLED)
                      .set("spark.locality.wait", SPARK_LOCALITY_WAIT)
                      .set("spark.streaming.kafka.consumer.poll.ms", SPARK_KAFKA_CONSUMER_POLL_INTERVAL)
                      .set("spark.streaming.kafka.maxRatePerPartition", SPARK_KAFKA_MAX_RATE_PER_PARTITION);
      
              LOGGER.debug("creating streaming context with minutes batch interval  ::: " + SPARK_BATCH_DURATION_IN_SECONDS);
              streamingContext = new JavaStreamingContext(sparkConf,
                      Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));
      
              /*
               * todo: add checkpointing to the streaming context to recover from driver
               * failures and also for offset management
               */
              LOGGER.info("checkpointing the streaming transactions at hdfs path :: /checkpoint");
              streamingContext.checkpoint("/checkpoint");
              streamingContext.addStreamingListener(new DataProcessingListener());
      }
      

       

       

      public void execute() throws InterruptedException {       JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); 
      telemetryStream.foreachRDD(rawRDD -> { 
      if (!rawRDD.isEmpty()) { 
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rawRDD.rdd()).offsetRanges(); 
      SparkSession spark = JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf()); JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> 
      { 
      ObjectMapper om = new ObjectMapper(); 
      JsonNode root = om.readTree(record.value()); 
      Map<String, JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten(); 
      JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class); 
      return new Tuple2<String, String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString()); 
      }); 
       
      Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("sensor_path", "sensor_data"); 
      Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col("sensor_path")).agg(collect_list(col("sensor_data").as("sensor_data"))); 
      Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device")); 
      HashMap<Object, Object> params = new HashMap<>(); params.put(DPConstants.OTSDB_CONFIG_F_PATH, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path")); params.put(DPConstants.OTSDB_CLIENT_TYPE, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type")); 
      try { 
      Pipeline lldpPipeline = PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY); 
      lldpPipeline.process(lldpGroupedDS, null); Pipeline pipeline = 
      PipelineFactory.getPipeline(PipelineType.TELEMETRY); pipeline.process(groupedDS, params); } 
      catch (Throwable t) { 
      t.printStackTrace(); 
      } 
      ((CanCommitOffsets) telemetryStream.inputDStream()).commitAsync(offsetRanges); 
      } }); 
      streamingContext.start(); 
      streamingContext.awaitTermination();
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            phanikumaryadavilli phanikumar
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: