Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8486

How to commit offset via Kafka

    XMLWordPrintableJSON

Details

    • Wish
    • Status: Resolved
    • Trivial
    • Resolution: Invalid
    • 2.2.1
    • None
    • consumer
    • None

    Description

      from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
      from json import loads
      import vertica_python
      from datetime import datetime as dt
      from time import sleep

      conn_vertica =

      {'host': '', 'port': 5433, 'user': '', 'password': '', 'database': '', 'use_prepared_statements': True}

      conn_to = conn_vertica

      def load()
      parsed_topic_name = 'orderSummary'

      consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
      bootstrap_servers=['us-kafka-broker:9092'],
      enable_auto_commit=False,
      group_id="my_group",
      value_deserializer=lambda x: loads(x.decode('utf-8'))
      )
      timeout = 20
      max_len = 10
      res = []
      t1 = dt.now()
      while (dt.now()-t1).seconds < timeout or len(res) < max_len:
      msgs = consumer.poll()
      print(msgs)
      for v in msgs.values()
      res += v

      with vertica_python.connect(**conn_to) as conn_2:
      curs2 = conn_2.cursor()
      if res:
      curs2.executemany('''
      INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, "offset", value)
      VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, r.value) for r in res])
      curs2.execute('COMMIT')

      else:
      print('Nothing!')

      consumer.close()
      #sleep(5)

      load()

      Attachments

        Activity

          People

            Unassigned Unassigned
            Klevtsov Stanislav
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: