Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-7820

How to connect to controller service DBCP connection pool and execute the sql using that connection via python

    XMLWordPrintableJSON

Details

    • Task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • None
    • NiFi Stateless
    • None

    Description

      I have a python code updated with my own logic to replace the index of enums with values, but in order to do so , i need to execute certain sql commands to get some values and the best thing would be to get the dbcp connection pool connection from nifi controller services and execute the commands , but i dont know to how to implement that in python, since few modules such as pymysql doesnt support in Nifi(Jython) ,Please find the code below:

      Any kind of help would be highly appreciated. Thanks in advance

       

      import json
      import re
      import sys
      import traceback
      from java.nio.charset import StandardCharsets
      from org.apache.commons.io import IOUtils
      from org.apache.nifi.processor.io import StreamCallback
      from org.python.core.util import StringUtil

      class TransformCallback(StreamCallback):
          def _init_(self):
              pass

          def process(self, inputStream, outputStream):
              try:
                  # Read input FlowFile content
                  input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                  input_obj = json.loads(input_text)
       
                  
                  table_name = input_obj['table_name']
                  column_name =  "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='"table_name"' AND data_type='enum'"
                  enum_value_sql = "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='"table_name"' AND column_name='"column_name"'"
                  enum_value = enum_value_sql.split(',')

                  for col in input_obj['columns']:
                      if col['name'] == str(column_name):
                          col['value'] = enum_value[int(col['value']) - 1]

                  output_text = json.dumps(input_obj)
                  outputStream.write(StringUtil.toBytes(output_text))
              except:
                  traceback.print_exc(file=sys.stdout)
                  raise

      flowFile = session.get()
      if flowFile != None:
          flowFile = session.write(flowFile, TransformCallback())

          # Finish by transferring the FlowFile to an output relationship
      session.transfer(flowFile, REL_SUCCESS)

      Attachments

        1. execute_script.py
          2 kB
          zeyk
        2. image-2020-09-20-01-44-32-117.png
          94 kB
          Roberto Garcia
        3. image-2020-09-20-01-45-19-739.png
          57 kB
          Roberto Garcia
        4. image-2020-09-20-01-58-03-615.png
          76 kB
          Roberto Garcia
        5. enumsWithPython.xml
          12 kB
          Roberto Garcia
        6. image-2020-09-20-17-36-02-743.png
          219 kB
          zeyk
        7. image-2020-09-20-17-37-12-487.png
          15 kB
          zeyk
        8. image-2020-09-20-17-37-49-409.png
          37 kB
          zeyk

        Activity

          People

            Unassigned Unassigned
            utra zeyk
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: