Details
-
Question
-
Status: Resolved
-
Major
-
Resolution: Workaround
-
2.4.5
-
None
-
None
Description
1) Create class
2) Instantiate class
3) Setup stream
4) Write stream. Here do I get a pickeling error, I really don't know how to get it work without error.
class CMDB:
#Public Properties
@property
def streamDF(self):
return spark.readStream.table(self.__source_table)
#Constructor
def _init_(self, destination_table, source_table):
self.__destination_table = destination_table
self.__source_table = source_table
#Private Methodes
def __processRow(self, row):
#API connection info
url = 'https://foo.service-now.com/api/now/table/' + self.__destination_table + '?sysparm_display_value=true'
user = 'username'
password = 'psw'
headers = {"Content-Type":"application/json","Accept":"application/json"}
response = requests.post(url, auth=(user, password), headers=headers, data = json.dumps(row.asDict()))
return response
#Public Methodes
def uploadStreamDF(self, df):
return df.writeStream.foreach(self.__processRow).trigger(once=True).start()
################################################################################
cmdb = CMDB('destination_table_name','source_table_name')
streamDF = (cmdb.streamDF
.withColumn('object_id',col('source_column_id'))
.withColumn('name',col('source_column_name'))
).select('object_id','name')
#set stream works, able to display data
cmdb.uploadStreamDF(streamDF)
#cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. See exception below:
'''
Exception Traceback (most recent call last)
/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
704 try:
--> 705 return cloudpickle.dumps(obj, 2)
706 except pickle.PickleError:
/databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
862 cp = CloudPickler(file,protocol)
--> 863 cp.dump(obj)
864 return file.getvalue()
/databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj)
259 try:
--> 260 return Pickler.dump(self, obj)
261 except RuntimeError as e:
/databricks/python/lib/python3.7/pickle.py in dump(self, obj)
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
'''