Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31795

Stream Data with API to ServiceNow

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Question
    • Status: Resolved
    • Priority: Major
    • Resolution: Workaround
    • Affects Version/s: 2.4.5
    • Fix Version/s: None
    • Component/s: DStreams
    • Labels:
      None

      Description

      1) Create class

      2) Instantiate class 

      3) Setup stream

      4) Write stream. Here do I get a pickeling error, I really don't know how to get it work without error.

       

       

      class CMDB:
      #Public Properties
      @property
      def streamDF(self):
      return spark.readStream.table(self.__source_table)

      #Constructor
      def _init_(self, destination_table, source_table):
      self.__destination_table = destination_table
      self.__source_table = source_table

      #Private Methodes
      def __processRow(self, row):
      #API connection info
      url = 'https://foo.service-now.com/api/now/table/' + self.__destination_table + '?sysparm_display_value=true'
      user = 'username'
      password = 'psw'

      headers = {"Content-Type":"application/json","Accept":"application/json"}
      response = requests.post(url, auth=(user, password), headers=headers, data = json.dumps(row.asDict()))

      return response

      #Public Methodes
      def uploadStreamDF(self, df):
      return df.writeStream.foreach(self.__processRow).trigger(once=True).start()

       

      ################################################################################

       

      cmdb = CMDB('destination_table_name','source_table_name')

      streamDF = (cmdb.streamDF
      .withColumn('object_id',col('source_column_id'))
      .withColumn('name',col('source_column_name'))
      ).select('object_id','name')
      #set stream works, able to display data

      cmdb.uploadStreamDF(streamDF)
      #cmdb.uploadStreamDF(streamDF) fails with error: PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. See exception below:

      '''
      Exception Traceback (most recent call last)
      /databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
      704 try:
      --> 705 return cloudpickle.dumps(obj, 2)
      706 except pickle.PickleError:

      /databricks/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
      862 cp = CloudPickler(file,protocol)
      --> 863 cp.dump(obj)
      864 return file.getvalue()

      /databricks/spark/python/pyspark/cloudpickle.py in dump(self, obj)
      259 try:
      --> 260 return Pickler.dump(self, obj)
      261 except RuntimeError as e:

      /databricks/python/lib/python3.7/pickle.py in dump(self, obj)
      436 self.framer.start_framing()
      --> 437 self.save(obj)
      438 self.write(STOP)
      '''

        Attachments

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

            • Assignee:
              Unassigned Assign to me
              Reporter:
              D0minic Dominic Wetenkamp

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment