Details
-
Wish
-
Status: Resolved
-
Trivial
-
Resolution: Invalid
-
2.2.1
-
None
-
None
Description
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from json import loads
import vertica_python
from datetime import datetime as dt
from time import sleep
conn_vertica =
{'host': '', 'port': 5433, 'user': '', 'password': '', 'database': '', 'use_prepared_statements': True}conn_to = conn_vertica
def load()
parsed_topic_name = 'orderSummary'
consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
bootstrap_servers=['us-kafka-broker:9092'],
enable_auto_commit=False,
group_id="my_group",
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
timeout = 20
max_len = 10
res = []
t1 = dt.now()
while (dt.now()-t1).seconds < timeout or len(res) < max_len:
msgs = consumer.poll()
print(msgs)
for v in msgs.values()
res += v
with vertica_python.connect(**conn_to) as conn_2:
curs2 = conn_2.cursor()
if res:
curs2.executemany('''
INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, "offset", value)
VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, r.value) for r in res])
curs2.execute('COMMIT')
else:
print('Nothing!')
consumer.close()
#sleep(5)
load()