Flume
  1. Flume
  2. FLUME-74

HIVE & FLUME integration - Documentation + Code

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: v0.9.1
    • Fix Version/s: v0.9.5
    • Component/s: Docs, Sinks+Sources
    • Labels:
      None

      Description

      http://archive.cloudera.com/cdh/3/flume/UserGuide.html does not contain information on how to import logs from FLUME to HIVE. Any insights on how to get it to work?

        Issue Links

          Activity

          Hide
          Jonathan Hsieh added a comment -

          Currently there are some hooks in HiveNotifyingDfsSink. These provide the information necessary for hive to register a new directory partition (or tell oozie to do trigger hive to do it). This assumes that your data is written out to directories that are partitioned. This does not modify the hive meta data store, or actually trigger hive to start using thos dirs – it just provides the the bare minimum information needed to notify hive assuming that the metadata is already setup.

          Initial cut would expect use an avro serialized data representaiton of a flume event.

          Part of me wants it to package this info as just another event that gets sent off to some other waiting listener who's sole job is to tell oozie/hive that new data is ready.

          Show
          Jonathan Hsieh added a comment - Currently there are some hooks in HiveNotifyingDfsSink. These provide the information necessary for hive to register a new directory partition (or tell oozie to do trigger hive to do it). This assumes that your data is written out to directories that are partitioned. This does not modify the hive meta data store, or actually trigger hive to start using thos dirs – it just provides the the bare minimum information needed to notify hive assuming that the metadata is already setup. Initial cut would expect use an avro serialized data representaiton of a flume event. Part of me wants it to package this info as just another event that gets sent off to some other waiting listener who's sole job is to tell oozie/hive that new data is ready.
          Hide
          Jonathan Hsieh added a comment - - edited

          17:36 <jmhsieh> Here's the story I had in mind after talking with carl (guy who is doing the next hive release)
          17:37 <jmhsieh> we use the collectorsink with the escaping to write out events to the different directories.
          17:37 <jmhsieh> then can be gzip files – that shouldn't matter too much.
          17:37 <jmhsieh> Hive needs a thing called a SerDe (Serializer / Deserializer) to understand data.
          17:38 <jmhsieh> Ideally flume would write to a format that hive understands already, or we'd hook in a new format.
          17:38 <jmhsieh> Instead of loading each record one by one, we bulk load data into hive. This is done by pointing hive to particular directories in hdfs (which flume has nicely laid out!).
          17:39 <jmhsieh> the directory structure reflects the partitioning used in hive.
          17:39 <jmhsieh> a new hive query runs, and since the data is there, it uses the new data.
          17:40 <jmhsieh> The catches are – there needs to be something to notify hive that thre is a new partition/dir to load when doing queries.
          17:40 <aphadke> (i'll chime in once u r done - but mostly our thoughts match)
          17:40 <jmhsieh> and there needs to be a table defined that knows how to read the data (this is where the SerDe comes in), and that explains how data is partitioned
          17:41 <jmhsieh> The hooks in HiveNotifying* does the first part – it is when flume knows it needs to notify hive, and currently just a stub.
          17:41 <jmhsieh> the latter part – defining the table is probably something to do manually and document in the docs.
          ...
          17:42 <aphadke> 1 - defining table should be manual and mentioned in docs.
          17:43 <hammer> good to see you guys moving forward
          17:43 <aphadke> hammer:
          17:43 <aphadke> 2 - directory structure for FLUME is fantastic for HIVE…. no worries there
          17:44 <aphadke> 3 - HiveNotifying* knows that the file has been written, which is good….
          17:44 <aphadke> take a look at http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java?view=markup
          17:44 ! abecc [~abecc@67.23.205.130] has joined #flume
          17:45 <aphadke> the thrift API allows us to load data inside hive.. i am not exactly sure the benefits of SerDe as against using the thrift api…. the api should essentially do the SerDe thing for us
          17:46 <aphadke> its prolly not a good idea for FLUME to change the log data, HIVE allows regex while creating the table… so the raw log file should just load in the table as per the regex and again, the thrift API should take care of that.
          17:46 <jmhsieh> aphadke: ah! I didn't know what the hive side looked like. the thift stuff starts the make sense now..
          17:47 <jmhsieh> aphadke: yeah, I assumed that flume would be writing out raw logs
          17:47 <aphadke> so essentially, we need to add thrift API to HiveNotifying*, read the gzip'ed log files and load them based on date format + partition
          17:48 <jmhsieh> aphadke: prolly need to talk to carl or one of the hive guys to get pointers to exactly how adding a partition works. I think if you use dates in the path of flumes output dirs, it is just a matter of letting hive know about the structure.
          17:49 <aphadke> jmhsieh: hql syntax would be :
          17:49 <aphadke> LOAD DATA INPATH '<somepath>' INTO TABLE test_table PARTITION (ds='2010-06-17');
          17:49 <jmhsieh> aphadke: I'd prefer if the thrift thing implemented the HiveDirCreatedHandler interface
          17:49 <jmhsieh> aphadke: hql stuff looks simple enough
          17:50 <aphadke> jmhsieh: afaik, thrift reads the file from HDFS, creates the directory structure inside hive and moves file from HDFS to hive directory structure
          17:51 <jmhsieh> aphadke: I think the hive stuff actually uses the data inplace (without moving it) – but I'm not completely sure about this.
          17:51 <aphadke> it definitely moves it..
          17:51 <jmhsieh> ok
          17:52 <aphadke> i.e. data from /user/aphadke/someLogs/2010-07-07 is moved to /user/hive/warehouse/<table_name>/partition/ etc.

          Show
          Jonathan Hsieh added a comment - - edited 17:36 <jmhsieh> Here's the story I had in mind after talking with carl (guy who is doing the next hive release) 17:37 <jmhsieh> we use the collectorsink with the escaping to write out events to the different directories. 17:37 <jmhsieh> then can be gzip files – that shouldn't matter too much. 17:37 <jmhsieh> Hive needs a thing called a SerDe (Serializer / Deserializer) to understand data. 17:38 <jmhsieh> Ideally flume would write to a format that hive understands already, or we'd hook in a new format. 17:38 <jmhsieh> Instead of loading each record one by one, we bulk load data into hive. This is done by pointing hive to particular directories in hdfs (which flume has nicely laid out!). 17:39 <jmhsieh> the directory structure reflects the partitioning used in hive. 17:39 <jmhsieh> a new hive query runs, and since the data is there, it uses the new data. 17:40 <jmhsieh> The catches are – there needs to be something to notify hive that thre is a new partition/dir to load when doing queries. 17:40 <aphadke> (i'll chime in once u r done - but mostly our thoughts match) 17:40 <jmhsieh> and there needs to be a table defined that knows how to read the data (this is where the SerDe comes in), and that explains how data is partitioned 17:41 <jmhsieh> The hooks in HiveNotifying* does the first part – it is when flume knows it needs to notify hive, and currently just a stub. 17:41 <jmhsieh> the latter part – defining the table is probably something to do manually and document in the docs. ... 17:42 <aphadke> 1 - defining table should be manual and mentioned in docs. 17:43 <hammer> good to see you guys moving forward 17:43 <aphadke> hammer: 17:43 <aphadke> 2 - directory structure for FLUME is fantastic for HIVE…. no worries there 17:44 <aphadke> 3 - HiveNotifying* knows that the file has been written, which is good…. 17:44 <aphadke> take a look at http://svn.apache.org/viewvc/hadoop/hive/trunk/service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java?view=markup 17:44 ! abecc [~abecc@67.23.205.130] has joined #flume 17:45 <aphadke> the thrift API allows us to load data inside hive.. i am not exactly sure the benefits of SerDe as against using the thrift api…. the api should essentially do the SerDe thing for us 17:46 <aphadke> its prolly not a good idea for FLUME to change the log data, HIVE allows regex while creating the table… so the raw log file should just load in the table as per the regex and again, the thrift API should take care of that. 17:46 <jmhsieh> aphadke: ah! I didn't know what the hive side looked like. the thift stuff starts the make sense now.. 17:47 <jmhsieh> aphadke: yeah, I assumed that flume would be writing out raw logs 17:47 <aphadke> so essentially, we need to add thrift API to HiveNotifying*, read the gzip'ed log files and load them based on date format + partition 17:48 <jmhsieh> aphadke: prolly need to talk to carl or one of the hive guys to get pointers to exactly how adding a partition works. I think if you use dates in the path of flumes output dirs, it is just a matter of letting hive know about the structure. 17:49 <aphadke> jmhsieh: hql syntax would be : 17:49 <aphadke> LOAD DATA INPATH '<somepath>' INTO TABLE test_table PARTITION (ds='2010-06-17'); 17:49 <jmhsieh> aphadke: I'd prefer if the thrift thing implemented the HiveDirCreatedHandler interface 17:49 <jmhsieh> aphadke: hql stuff looks simple enough 17:50 <aphadke> jmhsieh: afaik, thrift reads the file from HDFS, creates the directory structure inside hive and moves file from HDFS to hive directory structure 17:51 <jmhsieh> aphadke: I think the hive stuff actually uses the data inplace (without moving it) – but I'm not completely sure about this. 17:51 <aphadke> it definitely moves it.. 17:51 <jmhsieh> ok 17:52 <aphadke> i.e. data from /user/aphadke/someLogs/2010-07-07 is moved to /user/hive/warehouse/<table_name>/partition/ etc.
          Hide
          Jonathan Hsieh added a comment -

          17:57 <jmhsieh> aphadke: there maybe a race – flume may be still writing to a file in the dir, and if a hive thing kicks off and moves files, one of the two may get confused.
          17:58 <aphadke> what if we change the notifier to get invoked after its done writing to disk.. i am positive CustomDfsSink.java has a close method.
          17:58 <aphadke> we can add the hook after the close() to avoid race
          17:58 <jmhsieh> That can work – but there still could be more data being written into the dirs later – (something failed, and it ends up arriving late).

          Show
          Jonathan Hsieh added a comment - 17:57 <jmhsieh> aphadke: there maybe a race – flume may be still writing to a file in the dir, and if a hive thing kicks off and moves files, one of the two may get confused. 17:58 <aphadke> what if we change the notifier to get invoked after its done writing to disk.. i am positive CustomDfsSink.java has a close method. 17:58 <aphadke> we can add the hook after the close() to avoid race 17:58 <jmhsieh> That can work – but there still could be more data being written into the dirs later – (something failed, and it ends up arriving late).
          Hide
          Anurag Phadke added a comment -

          Patch will include the following:
          add argument to collectorSink for tablename or make it part of config file
          HiveMetaStoreClient.add_partition() hook this up once file is written to disk.
          i.e. implement HiveMetaStoreClient.add_partition() after close() call inside CustomDfsSink.java (or equivalent)
          assumptions - table has already been created inside HIVE and uses the correct serde format.

          Show
          Anurag Phadke added a comment - Patch will include the following: add argument to collectorSink for tablename or make it part of config file HiveMetaStoreClient.add_partition() hook this up once file is written to disk. i.e. implement HiveMetaStoreClient.add_partition() after close() call inside CustomDfsSink.java (or equivalent) assumptions - table has already been created inside HIVE and uses the correct serde format.
          Hide
          Jonathan Hsieh added a comment -

          Can we split this jira in to two pieces or use subtasks?
          Maybe
          1) the documentation for the manual hive parts,
          2) the code patches necessary to do the "automatic part"

          Show
          Jonathan Hsieh added a comment - Can we split this jira in to two pieces or use subtasks? Maybe 1) the documentation for the manual hive parts, 2) the code patches necessary to do the "automatic part"
          Hide
          Patrick Hunt added a comment -

          Moving to 0.9.2, not a blocker for 0.9.1.

          Show
          Patrick Hunt added a comment - Moving to 0.9.2, not a blocker for 0.9.1.
          Hide
          Mike Percy added a comment -

          don't see a patch anywhere

          Show
          Mike Percy added a comment - don't see a patch anywhere

            People

            • Assignee:
              Unassigned
              Reporter:
              Anurag Phadke
            • Votes:
              7 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:

                Development