Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-20377

Hive Kafka Storage Handler

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 4.0.0
    • 4.0.0-alpha-1
    • kafka integration
    • None

    Description

      Goal

      • Read streaming data form Kafka queue as an external table.
      • Allow streaming navigation by pushing down filters on Kafka record partition id, offset and timestamp.
      • Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.

        Example

        Create the external table

         
        CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` string, language string, added int, deleted int, flags string,comment string, namespace string)
        STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
        TBLPROPERTIES 
        ("kafka.topic" = "wikipedia", 
        "kafka.bootstrap.servers"="brokeraddress:9092",
        "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
        

        Kafka Metadata

        In order to keep track of Kafka records the storage handler will add automatically the Kafka row metadata eg partition id, record offset and record timestamp.

        DESCRIBE EXTENDED kafka_table
        
        timestamp              	timestamp           	from deserializer   
        page                	string              	from deserializer   
        user                	string              	from deserializer   
        language            	string              	from deserializer   
        country             	string              	from deserializer   
        continent           	string              	from deserializer   
        namespace           	string              	from deserializer   
        newpage             	boolean             	from deserializer   
        unpatrolled         	boolean             	from deserializer   
        anonymous           	boolean             	from deserializer   
        robot               	boolean             	from deserializer   
        added               	int                 	from deserializer   
        deleted             	int                 	from deserializer   
        delta               	bigint              	from deserializer   
        __partition         	int                 	from deserializer   
        __offset            	bigint              	from deserializer   
        __timestamp         	bigint              	from deserializer   
        
        

      Filter push down.

      Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given offset. The proposed storage handler will be able to leverage such API by pushing down filters over metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
      For instance Query like

       
      select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or (`__offset` = 109);
      

      Will result on a scan of partition 0 only then read only records between offset 4 and 109.

      With timestamp seeks

      The seeking based on the internal timestamps allows the handler to run on recently arrived data, by doing

      select count(*) from kafka_table where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
      

      This allows for implicit relationships between event timestamps and kafka timestamps to be expressed in queries (i.e event_timestamp is always < than kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).

      More examples with Avro

      CREATE EXTERNAL TABLE wiki_kafka_avro_table
      STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
      TBLPROPERTIES
      ("kafka.topic" = "wiki_kafka_avro_table",
      "kafka.bootstrap.servers"="localhost:9092",
      "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
      'avro.schema.literal'='{
        "type" : "record",
        "name" : "Wikipedia",
        "namespace" : "org.apache.hive.kafka",
        "version": "1",
        "fields" : [ {
          "name" : "isrobot",
          "type" : "boolean"
        }, {
          "name" : "channel",
          "type" : "string"
        }, {
          "name" : "timestamp",
          "type" : "string"
        }, {
          "name" : "flags",
          "type" : "string"
        }, {
          "name" : "isunpatrolled",
          "type" : "boolean"
        }, {
          "name" : "page",
          "type" : "string"
        }, {
          "name" : "diffurl",
          "type" : "string"
        }, {
          "name" : "added",
          "type" : "long"
        }, {
          "name" : "comment",
          "type" : "string"
        }, {
          "name" : "commentlength",
          "type" : "long"
        }, {
          "name" : "isnew",
          "type" : "boolean"
        }, {
          "name" : "isminor",
          "type" : "boolean"
        }, {
          "name" : "delta",
          "type" : "long"
        }, {
          "name" : "isanonymous",
          "type" : "boolean"
        }, {
          "name" : "user",
          "type" : "string"
        }, {
          "name" : "deltabucket",
          "type" : "double"
        }, {
          "name" : "deleted",
          "type" : "long"
        }, {
          "name" : "namespace",
          "type" : "string"
        } ]
      }'
      );
      
      

      Attachments

        1. HIVE-20377.10.patch
          199 kB
          Slim Bouguerra
        2. HIVE-20377.11.patch
          200 kB
          Slim Bouguerra
        3. HIVE-20377.12.patch
          265 kB
          Slim Bouguerra
        4. HIVE-20377.15.patch
          271 kB
          Slim Bouguerra
        5. HIVE-20377.18.patch
          272 kB
          Slim Bouguerra
        6. HIVE-20377.18.patch
          272 kB
          Slim Bouguerra
        7. HIVE-20377.19.patch
          272 kB
          Slim Bouguerra
        8. HIVE-20377.19.patch
          272 kB
          Slim Bouguerra
        9. HIVE-20377.19.patch
          272 kB
          Slim Bouguerra
        10. HIVE-20377.4.patch
          162 kB
          Slim Bouguerra
        11. HIVE-20377.5.patch
          192 kB
          Slim Bouguerra
        12. HIVE-20377.6.patch
          196 kB
          Slim Bouguerra
        13. HIVE-20377.8.patch
          197 kB
          Slim Bouguerra
        14. HIVE-20377.8.patch
          197 kB
          Slim Bouguerra
        15. HIVE-20377.patch
          160 kB
          Slim Bouguerra

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            People

              bslim Slim Bouguerra
              bslim Slim Bouguerra
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: