Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31795

Stream Data with API to ServiceNow

    XMLWordPrintableJSON

Details

    • Question
    • Status: Resolved
    • Major
    • Resolution: Workaround
    • 2.4.5
    • None
    • DStreams
    • None

    Description

      1) Create class

      2) Instantiate class 

      3) Setup stream

      4) Write stream. Here do I get a pickeling error, I really don't know how to get it work without error.

       

       

      class CMDB:
      #Public Properties
      @property
      def streamDF(self):
      return spark.readStream.table(self.__source_table)

      #Constructor
      def _init_(self, destination_table, source_table):
      self.__destination_table = destination_table
      self.__source_table = source_table

      #Private Methodes
      def __processRow(self, row):
      #API connection info
      url = 'https://foo.service-now.com/api/now/table/' + self.__destination_table + '?sysparm_display_value=true'
      user = 'username'
      password = 'psw'

      headers = {"Content-Type":"application/json","Accept":"application/json"}
      response = requests.post(url, auth=(user, password), headers=headers, data = json.dumps(row.asDict()))

      return response

      #Public Methodes
      def uploadStreamDF(self, df):
      return df.writeStream.foreach(self.__processRow).trigger(once=True).start()

       

      ################################################################################

       

      cmdb = CMDB('destination_table_name','source_table_name')

      streamDF = (cmdb.streamDF
      .withColumn('object_id',col('source_column_id'))
      .withColumn('name',col('source_column_name'))
      ).select('object_id','name')
      #set stream works, able to display data

      cmdb.uploadStreamDF(streamDF)
      #cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. See exception below:

      '''
      Exception Traceback (most recent call last)
      /databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
      704 try:
      --> 705 return cloudpickle.dumps(obj, 2)
      706 except pickle.PickleError:

      /databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
      862 cp = CloudPickler(file,protocol)
      --> 863 cp.dump(obj)
      864 return file.getvalue()

      /databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj)
      259 try:
      --> 260 return Pickler.dump(self, obj)
      261 except RuntimeError as e:

      /databricks/python/lib/python3.7/pickle.py in dump(self, obj)
      436 self.framer.start_framing()
      --> 437 self.save(obj)
      438 self.write(STOP)
      '''

      Attachments

        Activity

          People

            Unassigned Unassigned
            D0minic Dominic Wetenkamp
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: