Details
-
Task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
I have a python code updated with my own logic to replace the index of enums with values, but in order to do so , i need to execute certain sql commands to get some values and the best thing would be to get the dbcp connection pool connection from nifi controller services and execute the commands , but i dont know to how to implement that in python, since few modules such as pymysql doesnt support in Nifi(Jython) ,Please find the code below:
Any kind of help would be highly appreciated. Thanks in advance
import json
import re
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
class TransformCallback(StreamCallback):
def _init_(self):
pass
def process(self, inputStream, outputStream):
try:
# Read input FlowFile content
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
input_obj = json.loads(input_text)
table_name = input_obj['table_name']
column_name = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='"table_name"' AND data_type='enum'"
enum_value_sql = "SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name='"table_name"' AND column_name='"column_name"'"
enum_value = enum_value_sql.split(',')
for col in input_obj['columns']:
if col['name'] == str(column_name):
col['value'] = enum_value[int(col['value']) - 1]
output_text = json.dumps(input_obj)
outputStream.write(StringUtil.toBytes(output_text))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, TransformCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)