Details
Description
I am a beginner to pyspark and I am creating a pilot project in spark i used pycharm IDE for developing my project and it runs fine on my IDE Let me explain my project I am producing JSON in Kafka topic and consuming topic in spark and converting RDD VALUE(which is i JSON) converting to data frame using this method (productInfo = sqlContext.read.json(rdd)) and working perfectly on my local machine after converting RDD to DataFrame I am displaying that DataFrame to my console using .Show() method and working fine.
But my problem arises when I setup all this(Kafka,apache-spark) in EC2(Ubuntu 18.04.2 LTS) and tried to execute using spark-submit console stop when it reached my show() method and display nothing again starts and stops at show() method I can't figure out what is error not showing any error in console and also check if my data is coming in RDD or not it is in RDD
My Code:
# coding: utf-8 from pyspark import SparkContext from pyspark import SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql import Row, DataFrame, SQLContext import pandas as pd def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(time, rdd): print("========= %s =========" % str(time)) try: #print("--------------Also cross check my data is present in rdd I checked by printing ----------------") #results = rdd.collect() #for result in results: #print(result) # Get the singleton instance of SparkSession sqlContext = getSqlContextInstance(rdd.context) productInfo = sqlContext.read.json(rdd) # problem comes here when i try to show it productInfo.show() except: pass if _name_ == '_main_': conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1") sc = SparkContext(conf = conf) sc.setLogLevel("WARN") sqlContext = SQLContext(sc) ssc = StreamingContext(sc,10) kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'new_topic':1}) lines = kafkaStream.map(lambda x: x[1]) lines.foreachRDD(process) #lines.pprint() ssc.start() ssc.awaitTermination()
My console:
./spark-submit ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py 19/07/10 11:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/07/10 11:13:15 INFO SparkContext: Running Spark version 2.4.3 19/07/10 11:13:15 INFO SparkContext: Submitted application: ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py 19/07/10 11:13:15 INFO SecurityManager: Changing view acls to: kafka 19/07/10 11:13:15 INFO SecurityManager: Changing modify acls to: kafka 19/07/10 11:13:15 INFO SecurityManager: Changing view acls groups to: 19/07/10 11:13:15 INFO SecurityManager: Changing modify acls groups to: 19/07/10 11:13:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(kafka); groups with view permissions: Set(); users with modify permissions: Set(kafka); groups with modify permissions: Set() 19/07/10 11:13:16 INFO Utils: Successfully started service 'sparkDriver' on port 41655. 19/07/10 11:13:16 INFO SparkEnv: Registering MapOutputTracker 19/07/10 11:13:16 INFO SparkEnv: Registering BlockManagerMaster 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 19/07/10 11:13:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-33f848fe-88d7-4c8f-8440-8384e094c59c 19/07/10 11:13:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 19/07/10 11:13:16 INFO SparkEnv: Registering OutputCommitCoordinator 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044. 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045. 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046. 19/07/10 11:13:16 INFO Utils: Successfully started service 'SparkUI' on port 4046. 19/07/10 11:13:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at [http://ip-172-31-92-134.ec2.internal:4046|http://ip-172-31-92-134.ec2.internal:4046/] 19/07/10 11:13:16 INFO Executor: Starting executor ID driver on host localhost 19/07/10 11:13:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34719. 19/07/10 11:13:16 INFO NettyBlockTransferService: Server created on ip-172-31-92-134.ec2.internal:34719 19/07/10 11:13:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/07/10 11:13:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-92-134.ec2.internal:34719 with 366.3 MB RAM, BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) 19/07/10 11:13:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) 19/07/10 11:13:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) 19/07/10 11:13:17 WARN AppInfo$: Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException 19/07/10 11:13:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:13:18 WARN BlockManager: Block input-0-1562757198000 replicated to only 0 peer(s) instead of 1 peers
This is when I am not producing in data in my kafka topic
========= 2019-07-10 11:13:20 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- ========= 2019-07-10 11:13:30 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- ++ ++ ++ ------------------------after printing----------------------- ========= 2019-07-10 11:13:40 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- ++ ++ ++ ------------------------after printing----------------------- ========= 2019-07-10 11:15:40 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- ++ ++ ++ ------------------------after printing----------------------- 19/07/10 11:15:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:15:47 WARN BlockManager: Block input-0-1562757347200 replicated to only 0 peer(s) instead of 1 peers
This is when I start producing my data in kafka topic
========= 2019-07-10 11:15:50 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- 19/07/10 11:15:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:15:52 WARN BlockManager: Block input-0-1562757352200 replicated to only 0 peer(s) instead of 1 peers 19/07/10 11:15:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:15:57 WARN BlockManager: Block input-0-1562757357200 replicated to only 0 peer(s) instead of 1 peers ========= 2019-07-10 11:16:00 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- 19/07/10 11:16:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:16:02 WARN BlockManager: Block input-0-1562757362200 replicated to only 0 peer(s) instead of 1 peers 19/07/10 11:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:16:07 WARN BlockManager: Block input-0-1562757367400 replicated to only 0 peer(s) instead of 1 peers ========= 2019-07-10 11:16:10 ========= ---------------------in function procces---------------------- -----------------------before printing---------------------- 19/07/10 11:16:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:16:12 WARN BlockManager: Block input-0-1562757372400 replicated to only 0 peer(s) instead of 1 peers 19/07/10 11:16:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 19/07/10 11:16:17 WARN BlockManager: Block input-0-1562757377400 replicated to only 0 peer(s) instead of 1 peers
I don't how to figure out can anyone help me really appreciated.
Thank you