Hive
  1. Hive
  2. HIVE-645

A UDF that can export data to JDBC databases.

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.5.0
    • Component/s: Contrib, UDF
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Provides DBOutputUDF

      Description

      A UDF that can export data to JDBC databases.

      1. hive-645.patch
        6 kB
        Edward Capriolo
      2. hive-645-2.patch
        6 kB
        Edward Capriolo
      3. hive-645-3.patch
        12 kB
        Edward Capriolo
      4. hive-645-4.patch
        13 kB
        Edward Capriolo
      5. hive-645-5.patch
        13 kB
        Edward Capriolo
      6. hive-645-6.patch
        14 kB
        Edward Capriolo

        Issue Links

          Activity

          Hide
          Edward Capriolo added a comment -

          First Draft patch for comments.

          Show
          Edward Capriolo added a comment - First Draft patch for comments.
          Hide
          Edward Capriolo added a comment -

          Better use of object inspectors.

          Show
          Edward Capriolo added a comment - Better use of object inspectors.
          Hide
          Namit Jain added a comment -

          Actually, this is more of a user-defined procedure than a user-defined function.
          Should we support them in some other way than a select list item ?

          Something, like

          begin
          proc();
          end;

          Show
          Namit Jain added a comment - Actually, this is more of a user-defined procedure than a user-defined function. Should we support them in some other way than a select list item ? Something, like begin proc(); end;
          Hide
          Edward Capriolo added a comment -

          Need ; in test case

          Show
          Edward Capriolo added a comment - Need ; in test case
          Hide
          Edward Capriolo added a comment -

          Namit,

          I could imagine this task as a user defined function. I do not think it has to be implemented that way. Would stored procedures be implemented in a TSQL like language or java code? In the near or mid term is this viable?

          Show
          Edward Capriolo added a comment - Namit, I could imagine this task as a user defined function. I do not think it has to be implemented that way. Would stored procedures be implemented in a TSQL like language or java code? In the near or mid term is this viable?
          Hide
          Namit Jain added a comment -

          It was just a a thought - oracle distinguishes between user defined functions and procedures.
          User defined functions are part of the select list, and procedures are not.

          Having said, they are compiled and executed natively. Since we have no plans of doing it (at least for some time),
          we can ignore it.

          I just thought do we need to construct where there is no return value.

          Other slightly related issue is that there may not be a table: so, should we have a equivalent of table function.
          Raghu has filed a jira for that also - we should think about all of them and then decide.

          The proposed user defined procedures syntax does not take a table name, since it is not part of select list, which
          may be a common scenario.

          Show
          Namit Jain added a comment - It was just a a thought - oracle distinguishes between user defined functions and procedures. User defined functions are part of the select list, and procedures are not. Having said, they are compiled and executed natively. Since we have no plans of doing it (at least for some time), we can ignore it. I just thought do we need to construct where there is no return value. Other slightly related issue is that there may not be a table: so, should we have a equivalent of table function. Raghu has filed a jira for that also - we should think about all of them and then decide. The proposed user defined procedures syntax does not take a table name, since it is not part of select list, which may be a common scenario.
          Hide
          Ashish Thusoo added a comment -

          One more thing. Having side effects in the udfs is generally not a good idea more so because mappers may fail and leave spurious rows may appear in the target data set specially since each udf invocation is protected through a transaction instead of the whole chunk of rows being protected through a transaction.

          Show
          Ashish Thusoo added a comment - One more thing. Having side effects in the udfs is generally not a good idea more so because mappers may fail and leave spurious rows may appear in the target data set specially since each udf invocation is protected through a transaction instead of the whole chunk of rows being protected through a transaction.
          Hide
          Edward Capriolo added a comment -

          @Ashish,

          I see your logic on this. My thinking is that key constraints on the destination table should protect most cases of erroneous data by simply failing on a duplicate key. The user of this UDF would have to make the call if that is acceptable for them. Even with a transactional database million row transactions are not practical, any process could fail midway and leave artifacts.

          In any case, I would like to close up this and some other smaller issues I have open. So I see a few courses of action here:

          • get a new use case scenario with a more concrete coding direction re-code/ create some other utility
          • add dboutput as a udf
          • add dboutput as a contrib udf
          • dont add it, close this jira. Host it myself.

          How should we proceed?

          Show
          Edward Capriolo added a comment - @Ashish, I see your logic on this. My thinking is that key constraints on the destination table should protect most cases of erroneous data by simply failing on a duplicate key. The user of this UDF would have to make the call if that is acceptable for them. Even with a transactional database million row transactions are not practical, any process could fail midway and leave artifacts. In any case, I would like to close up this and some other smaller issues I have open. So I see a few courses of action here: get a new use case scenario with a more concrete coding direction re-code/ create some other utility add dboutput as a udf add dboutput as a contrib udf dont add it, close this jira. Host it myself. How should we proceed?
          Hide
          Namit Jain added a comment -

          I think it can have very serious side-affects.

          What is your use-case ? If I insert through this UDF, it may end up inserting more rows than you want. How are you handling failures of mappers/reducers ?

          Based on that, we can take the next action.

          Show
          Namit Jain added a comment - I think it can have very serious side-affects. What is your use-case ? If I insert through this UDF, it may end up inserting more rows than you want. How are you handling failures of mappers/reducers ? Based on that, we can take the next action.
          Hide
          Edward Capriolo added a comment -

          My jiras got a lot of attention today. I feel like the most popular guy on the internet. There may be some syntax inaccuracy here but I this is my use case.

          As input I have a hive table with weblogs. It may be partitioned or not.
          hive_table web_logs
          date,time,url,httpstatus
          2009-06-04,05:49:00,/index.jsp, 200
          2009-06-04,05:50:00,/index.jsp, 200
          2009-06-04,05:49:00,/indexsfg.jsp, 404

          I wish to produce a report of status codes for day
          2009-06-04, 200,2
          2009-06-04, 404,1

          I create a mysql table for this data
          create table status_count(
          string date,
          int status,
          int count,
          primary key (date,status)
          );

          select dboutput(
          'jdbc://localhost:3306/hivedump',
          'insert into status_count (date,status,count) values (?,?,?)',
          date,httpstatus, count(1) ) from web_logs group by date,httpstatus;

          In my case mysql primary key constraints deal with failed mappers/reducers. After all, if a mapper runs again it will try to insert again and fail. Not a problem. In most cases the data hive is dumping to mysql is relational and should have a primary key.

          If the data does not have a primary key then yes dboutput() will not function correctly with failed mappers/reducers, but its intented use case would be to push a summary at a mysql database possibly 100 - 100,000 rows.

          Show
          Edward Capriolo added a comment - My jiras got a lot of attention today. I feel like the most popular guy on the internet. There may be some syntax inaccuracy here but I this is my use case. As input I have a hive table with weblogs. It may be partitioned or not. hive_table web_logs date,time,url,httpstatus 2009-06-04,05:49:00,/index.jsp, 200 2009-06-04,05:50:00,/index.jsp, 200 2009-06-04,05:49:00,/indexsfg.jsp, 404 I wish to produce a report of status codes for day 2009-06-04, 200,2 2009-06-04, 404,1 I create a mysql table for this data create table status_count( string date, int status, int count, primary key (date,status) ); select dboutput( 'jdbc://localhost:3306/hivedump', 'insert into status_count (date,status,count) values (?,?,?)', date,httpstatus, count(1) ) from web_logs group by date,httpstatus; In my case mysql primary key constraints deal with failed mappers/reducers. After all, if a mapper runs again it will try to insert again and fail. Not a problem. In most cases the data hive is dumping to mysql is relational and should have a primary key. If the data does not have a primary key then yes dboutput() will not function correctly with failed mappers/reducers, but its intented use case would be to push a summary at a mysql database possibly 100 - 100,000 rows.
          Hide
          Namit Jain added a comment -

          OK - you can put it into contrib.

          Also, can you add a testcase which will have this failure behavior. You dont need to have a failed mapper, but can insert the same row twice. In that case,
          the second insert will fail and we will get back a return value of 1 - it would be good to verify that only 1 row is inserted

          Show
          Namit Jain added a comment - OK - you can put it into contrib. Also, can you add a testcase which will have this failure behavior. You dont need to have a failed mapper, but can insert the same row twice. In that case, the second insert will fail and we will get back a return value of 1 - it would be good to verify that only 1 row is inserted
          Hide
          Namit Jain added a comment -

          Also, in your test, can you add a case when the table columns are being used for parameters - the current test takes constants.

          Show
          Namit Jain added a comment - Also, in your test, can you add a case when the table columns are being used for parameters - the current test takes constants.
          Hide
          Edward Capriolo added a comment -

          Moved the the gen-udf to the contrib package. Also added the test cases that Namit suggested. The first test case uses static values. The second selects keys and values from the source table to be "dboutputed".

          Show
          Edward Capriolo added a comment - Moved the the gen-udf to the contrib package. Also added the test cases that Namit suggested. The first test case uses static values. The second selects keys and values from the source table to be "dboutputed".
          Hide
          Zheng Shao added a comment -

          One last comment: Can you add a summary of this discussion (failure cases, etc) to the javadoc comment at the beginning of the GenericUDFDBOutput function?
          Just want to make sure users know the limitations.

          Show
          Zheng Shao added a comment - One last comment: Can you add a summary of this discussion (failure cases, etc) to the javadoc comment at the beginning of the GenericUDFDBOutput function? Just want to make sure users know the limitations.
          Hide
          Edward Capriolo added a comment -

          Patch adds

          • Apache license header
          • Comments as suggested to warn/explain behavior with component failure.
          Show
          Edward Capriolo added a comment - Patch adds Apache license header Comments as suggested to warn/explain behavior with component failure.
          Hide
          Edward Capriolo added a comment -


          Shamefully made four spelling/grammatical errors in less then two paragraphs of text. Very sad.

          Show
          Edward Capriolo added a comment - Shamefully made four spelling/grammatical errors in less then two paragraphs of text. Very sad.
          Hide
          Raghotham Murthy added a comment -

          In HIVE-758 we made hbase_put a UDAF. It returns the total number of rows loaded. Does it make sense to have functions that load data into external systems as UDAFs instead of UDFs with LIMIT 1? The disadvantage is the additional reducer. But, I guess if the information returned (like total number of rows) is useful, it might be well worth it and not too much overhead.

          Show
          Raghotham Murthy added a comment - In HIVE-758 we made hbase_put a UDAF. It returns the total number of rows loaded. Does it make sense to have functions that load data into external systems as UDAFs instead of UDFs with LIMIT 1? The disadvantage is the additional reducer. But, I guess if the information returned (like total number of rows) is useful, it might be well worth it and not too much overhead.
          Hide
          Edward Capriolo added a comment -

          Raghotham,

          This does not require limit 1. My test case just uses limit 1 to specify static data. As is the case with the second query the number of rows selected is returned. You could add another level of sub queries to count up the number of 0's returned.

          So this:

          SELECT
          dboutput('jdbc:derby:../build/test_dboutput_db','','',
          'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value)
          

          Becomes something like this: (bad syntax but you get the idea)

          from (
          SELECT
          dboutput('jdbc:derby:../build/test_dboutput_db','','',
          'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value) as x
          ) sum (x) where x=0
          

          On a side note, I always try to release some patches along the way so people know what I am up to and for suggestions. I spent a lot of time with HIVE-487 reflection and then we went with the shims. It is a lot of throw away work if I switch from UDF to UDAF at this point since I am pretty much at the phase where I am crossing i's and dotting t's.

          Show
          Edward Capriolo added a comment - Raghotham, This does not require limit 1. My test case just uses limit 1 to specify static data. As is the case with the second query the number of rows selected is returned. You could add another level of sub queries to count up the number of 0's returned. So this: SELECT dboutput('jdbc:derby:../build/test_dboutput_db','','', 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value) Becomes something like this: (bad syntax but you get the idea) from ( SELECT dboutput('jdbc:derby:../build/test_dboutput_db','','', 'INSERT INTO app_info (kkey,vvalue) VALUES (?,?)',key,value) as x ) sum (x) where x=0 On a side note, I always try to release some patches along the way so people know what I am up to and for suggestions. I spent a lot of time with HIVE-487 reflection and then we went with the shims. It is a lot of throw away work if I switch from UDF to UDAF at this point since I am pretty much at the phase where I am crossing i's and dotting t's.
          Hide
          He Yongqiang added a comment -

          Hi Edward, How do you deal with db connections. I saw you create one connection and close it for each row. Have you tested it with a large data sets?

          Show
          He Yongqiang added a comment - Hi Edward, How do you deal with db connections. I saw you create one connection and close it for each row. Have you tested it with a large data sets?
          Hide
          Edward Capriolo added a comment -

          He,

          Read above a little bit. My use case is small data sets.

          If opening connections is an issue, connection pooling would help with that.

          Show
          Edward Capriolo added a comment - He, Read above a little bit. My use case is small data sets. If opening connections is an issue, connection pooling would help with that.
          Hide
          Luis Ramos added a comment -

          Hi, thank you for this patch, this is what I was trying to do. I'm trying to run a sample Query but get errors:

          Output:
          SELECT dboutput('jdbc://localhost:3306/analytics','root','kdjf','INSERT INTO status_count (date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus;
          Total MapReduce jobs = 1
          Number of reduce tasks not specified. Estimated from input data size: 1
          In order to change the average load for a reducer (in bytes):
          set hive.exec.reducers.bytes.per.reducer=<number>
          In order to limit the maximum number of reducers:
          set hive.exec.reducers.max=<number>
          In order to set a constant number of reducers:
          set mapred.reduce.tasks=<number>
          Starting Job = job_200908201702_0066, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0066
          Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0066
          2009-08-21 09:20:33,914 map = 0%, reduce = 0%
          2009-08-21 09:20:36,933 map = 50%, reduce = 50%
          2009-08-21 09:20:39,953 map = 100%, reduce = 100%
          Ended Job = job_200908201702_0066 with errors
          FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.ExecDriver

          Any Ideas? Thank You.

          Show
          Luis Ramos added a comment - Hi, thank you for this patch, this is what I was trying to do. I'm trying to run a sample Query but get errors: Output: SELECT dboutput('jdbc://localhost:3306/analytics','root','kdjf','INSERT INTO status_count (date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus; Total MapReduce jobs = 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_200908201702_0066, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0066 Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0066 2009-08-21 09:20:33,914 map = 0%, reduce = 0% 2009-08-21 09:20:36,933 map = 50%, reduce = 50% 2009-08-21 09:20:39,953 map = 100%, reduce = 100% Ended Job = job_200908201702_0066 with errors FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.ExecDriver Any Ideas? Thank You.
          Hide
          Edward Capriolo added a comment -

          Luis,

          Try turning the logging level to debug you will know if the error is happening inside the UDF.

          Show
          Edward Capriolo added a comment - Luis, Try turning the logging level to debug you will know if the error is happening inside the UDF.
          Hide
          Edward Capriolo added a comment -

          Luis,
          I have setup many similar experiments and have not had a problem. Looking at your dump..

          jdbc://localhost:3306/analytics
          

          Are you sure you have the proper prefix? Do you have the driver loaded with 'add jar '

          jdbc:mysql://localhost:3306/analytics
          

          Even with the improper prefix and missing jar the result set should be all:

          1
          1
          1
          ....
          
          FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.ExecDriver
          

          Leads me to believe some other problem is happening.

          Please let me know. I

          Show
          Edward Capriolo added a comment - Luis, I have setup many similar experiments and have not had a problem. Looking at your dump.. jdbc://localhost:3306/analytics Are you sure you have the proper prefix? Do you have the driver loaded with 'add jar ' jdbc:mysql://localhost:3306/analytics Even with the improper prefix and missing jar the result set should be all: 1 1 1 .... FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.ExecDriver Leads me to believe some other problem is happening. Please let me know. I
          Hide
          Luis Ramos added a comment -

          Edward,

          Adding the jar got rid of the Failed for ExecDriver, thanks, HOWEVER, I still don't see any difference on mysql (it doesn't insert or anything).

          I did these steps:
          1. add jar build/contrib/hive_contrib.jar;
          2. CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
          3. SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus;

          On my hive.log I see:
          WARN mapred.JobClient (JobClient.java:configureCommandLineOptions(510)) - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

          Output to console is:
          $ hive -hiveconf hive.root.logger=INFO,console
          hive> SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus;
          09/08/24 22:21:00 INFO parse.ParseDriver: Parsing command: SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus
          09/08/24 22:21:00 INFO parse.ParseDriver: Parse Completed
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed phase 1 of Semantic Analysis
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for source tables
          09/08/24 22:21:00 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=web_logs
          09/08/24 22:21:00 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
          09/08/24 22:21:00 INFO metastore.ObjectStore: ObjectStore, initialize called
          09/08/24 22:21:00 INFO metastore.ObjectStore: Initialized ObjectStore
          09/08/24 22:21:00 INFO hive.log: DDL: struct web_logs

          { string day, string time, string url, string httpstatus}

          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for subqueries
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for destination tables
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed getting MetaData in Semantic Analysis
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for FS(34)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for SEL(33)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for GBY(32)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for RS(31)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for GBY(30)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for SEL(29)
          09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for TS(28)
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed partition pruning
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed sample pruning
          09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed plan generation
          09/08/24 22:21:00 INFO ql.Driver: Semantic Analysis Completed
          09/08/24 22:21:00 INFO ql.Driver: Starting command: SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus
          Total MapReduce jobs = 1
          09/08/24 22:21:00 INFO ql.Driver: Total MapReduce jobs = 1
          09/08/24 22:21:00 INFO exec.ExecDriver: BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=108
          Number of reduce tasks not specified. Estimated from input data size: 1
          09/08/24 22:21:00 INFO exec.ExecDriver: Number of reduce tasks not specified. Estimated from input data size: 1
          In order to change the average load for a reducer (in bytes):
          09/08/24 22:21:00 INFO exec.ExecDriver: In order to change the average load for a reducer (in bytes):
          set hive.exec.reducers.bytes.per.reducer=<number>
          09/08/24 22:21:00 INFO exec.ExecDriver: set hive.exec.reducers.bytes.per.reducer=<number>
          In order to limit the maximum number of reducers:
          09/08/24 22:21:00 INFO exec.ExecDriver: In order to limit the maximum number of reducers:
          set hive.exec.reducers.max=<number>
          09/08/24 22:21:00 INFO exec.ExecDriver: set hive.exec.reducers.max=<number>
          In order to set a constant number of reducers:
          09/08/24 22:21:00 INFO exec.ExecDriver: In order to set a constant number of reducers:
          set mapred.reduce.tasks=<number>
          09/08/24 22:21:00 INFO exec.ExecDriver: set mapred.reduce.tasks=<number>
          09/08/24 22:21:00 INFO exec.ExecDriver: adding libjars: file:///home/hadoop/hive/build/contrib/hive_contrib.jar
          09/08/24 22:21:00 INFO exec.ExecDriver: Processing alias web_logs
          09/08/24 22:21:00 INFO exec.ExecDriver: Adding input file hdfs://verizonmaster:50120/user/hive/warehouse/web_logs
          09/08/24 22:21:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
          09/08/24 22:21:00 INFO mapred.FileInputFormat: Total input paths to process : 1
          09/08/24 22:21:00 INFO mapred.FileInputFormat: Total input paths to process : 1
          Starting Job = job_200908201702_0102, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0102
          09/08/24 22:21:01 INFO exec.ExecDriver: Starting Job = job_200908201702_0102, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0102
          Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0102
          09/08/24 22:21:01 INFO exec.ExecDriver: Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0102
          2009-08-24 10:21:02,237 map = 0%, reduce = 0%
          09/08/24 22:21:02 INFO exec.ExecDriver: 2009-08-24 10:21:02,237 map = 0%, reduce = 0%
          2009-08-24 10:21:08,253 map = 50%, reduce = 50%
          09/08/24 22:21:08 INFO exec.ExecDriver: 2009-08-24 10:21:08,253 map = 50%, reduce = 50%
          2009-08-24 10:21:10,265 map = 100%, reduce = 100%
          09/08/24 22:21:10 INFO exec.ExecDriver: 2009-08-24 10:21:10,265 map = 100%, reduce = 100%
          Ended Job = job_200908201702_0102
          09/08/24 22:21:21 INFO exec.ExecDriver: Ended Job = job_200908201702_0102
          09/08/24 22:21:21 INFO exec.FileSinkOperator: Moving tmp dir: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001 to: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001.intermediate
          09/08/24 22:21:21 INFO exec.FileSinkOperator: Moving tmp dir: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001.intermediate to: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/10001
          OK
          09/08/24 22:21:21 INFO ql.Driver: OK
          09/08/24 22:21:21 INFO mapred.FileInputFormat: Total input paths to process : 1
          09/08/24 22:21:21 INFO mapred.FileInputFormat: Total input paths to process : 1
          1
          1
          Time taken: 21.03 seconds

          I'm going to try other test cases and different data, I tried sending constants into status_count but that didn't work either. Is there other jar's I might be missing.

          Thank You.

          Show
          Luis Ramos added a comment - Edward, Adding the jar got rid of the Failed for ExecDriver, thanks, HOWEVER, I still don't see any difference on mysql (it doesn't insert or anything). I did these steps: 1. add jar build/contrib/hive_contrib.jar; 2. CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'; 3. SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus; On my hive.log I see: WARN mapred.JobClient (JobClient.java:configureCommandLineOptions(510)) - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. Output to console is: $ hive -hiveconf hive.root.logger=INFO,console hive> SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus; 09/08/24 22:21:00 INFO parse.ParseDriver: Parsing command: SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus 09/08/24 22:21:00 INFO parse.ParseDriver: Parse Completed 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Starting Semantic Analysis 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed phase 1 of Semantic Analysis 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for source tables 09/08/24 22:21:00 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=web_logs 09/08/24 22:21:00 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 09/08/24 22:21:00 INFO metastore.ObjectStore: ObjectStore, initialize called 09/08/24 22:21:00 INFO metastore.ObjectStore: Initialized ObjectStore 09/08/24 22:21:00 INFO hive.log: DDL: struct web_logs { string day, string time, string url, string httpstatus} 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for subqueries 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Get metadata for destination tables 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed getting MetaData in Semantic Analysis 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for FS(34) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for SEL(33) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for GBY(32) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for RS(31) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for GBY(30) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for SEL(29) 09/08/24 22:21:00 INFO ppd.OpProcFactory: Processing for TS(28) 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed partition pruning 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed sample pruning 09/08/24 22:21:00 INFO parse.SemanticAnalyzer: Completed plan generation 09/08/24 22:21:00 INFO ql.Driver: Semantic Analysis Completed 09/08/24 22:21:00 INFO ql.Driver: Starting command: SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','password','INSERT INTO status_count(date,status,count) VALUES (?,?,?)',day,httpstatus,count(1)) FROM web_logs GROUP BY day, httpstatus Total MapReduce jobs = 1 09/08/24 22:21:00 INFO ql.Driver: Total MapReduce jobs = 1 09/08/24 22:21:00 INFO exec.ExecDriver: BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=108 Number of reduce tasks not specified. Estimated from input data size: 1 09/08/24 22:21:00 INFO exec.ExecDriver: Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): 09/08/24 22:21:00 INFO exec.ExecDriver: In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> 09/08/24 22:21:00 INFO exec.ExecDriver: set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: 09/08/24 22:21:00 INFO exec.ExecDriver: In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> 09/08/24 22:21:00 INFO exec.ExecDriver: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: 09/08/24 22:21:00 INFO exec.ExecDriver: In order to set a constant number of reducers: set mapred.reduce.tasks=<number> 09/08/24 22:21:00 INFO exec.ExecDriver: set mapred.reduce.tasks=<number> 09/08/24 22:21:00 INFO exec.ExecDriver: adding libjars: file:///home/hadoop/hive/build/contrib/hive_contrib.jar 09/08/24 22:21:00 INFO exec.ExecDriver: Processing alias web_logs 09/08/24 22:21:00 INFO exec.ExecDriver: Adding input file hdfs://verizonmaster:50120/user/hive/warehouse/web_logs 09/08/24 22:21:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 09/08/24 22:21:00 INFO mapred.FileInputFormat: Total input paths to process : 1 09/08/24 22:21:00 INFO mapred.FileInputFormat: Total input paths to process : 1 Starting Job = job_200908201702_0102, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0102 09/08/24 22:21:01 INFO exec.ExecDriver: Starting Job = job_200908201702_0102, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0102 Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0102 09/08/24 22:21:01 INFO exec.ExecDriver: Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0102 2009-08-24 10:21:02,237 map = 0%, reduce = 0% 09/08/24 22:21:02 INFO exec.ExecDriver: 2009-08-24 10:21:02,237 map = 0%, reduce = 0% 2009-08-24 10:21:08,253 map = 50%, reduce = 50% 09/08/24 22:21:08 INFO exec.ExecDriver: 2009-08-24 10:21:08,253 map = 50%, reduce = 50% 2009-08-24 10:21:10,265 map = 100%, reduce = 100% 09/08/24 22:21:10 INFO exec.ExecDriver: 2009-08-24 10:21:10,265 map = 100%, reduce = 100% Ended Job = job_200908201702_0102 09/08/24 22:21:21 INFO exec.ExecDriver: Ended Job = job_200908201702_0102 09/08/24 22:21:21 INFO exec.FileSinkOperator: Moving tmp dir: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001 to: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001.intermediate 09/08/24 22:21:21 INFO exec.FileSinkOperator: Moving tmp dir: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/_tmp.10001.intermediate to: hdfs://verizonmaster:50120/tmp/hive-hadoop/942129758/10001 OK 09/08/24 22:21:21 INFO ql.Driver: OK 09/08/24 22:21:21 INFO mapred.FileInputFormat: Total input paths to process : 1 09/08/24 22:21:21 INFO mapred.FileInputFormat: Total input paths to process : 1 1 1 Time taken: 21.03 seconds I'm going to try other test cases and different data, I tried sending constants into status_count but that didn't work either. Is there other jar's I might be missing. Thank You.
          Hide
          Edward Capriolo added a comment -

          Now you are making progress. The '1' return is a failure code for the UDF. If you set the logging level high enough you should see why.

          Right now I think the problem is you are not doing

          add jar mysql-connector.jar;
          

          We could return different numbers to indicate where the failure happened. but 1 indicates (could not load the driver or bad query)

          Show
          Edward Capriolo added a comment - Now you are making progress. The '1' return is a failure code for the UDF. If you set the logging level high enough you should see why. Right now I think the problem is you are not doing add jar mysql-connector.jar; We could return different numbers to indicate where the failure happened. but 1 indicates (could not load the driver or bad query)
          Hide
          Luis Ramos added a comment -

          Edward,

          How do I set the logging level? I have "hive.root.logger=DEBUG,DRFA" on hive-log4j.properties.

          I followed the following, and I also change the SQL
          {{
          hive> add jar /usr/share/java//usr/share/java/mysql-connector-java-5.1.5.jar;
          Added /usr/share/java//usr/share/java/mysql-connector-java-5.1.5.jar to class path

          hive> add jar build/contrib/hive_contrib.jar;
          Added build/contrib/hive_contrib.jar to class path

          hive> CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
          OK
          Time taken: 0.247 seconds

          hive> SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','mxplay','CREATE TABLE app_info (kkey VARCHAR(255), vvalue VARCHAR(255))') FROM web_logs;
          Total MapReduce jobs = 1
          Number of reduce tasks is set to 0 since there's no reduce operator
          Starting Job = job_200908201702_0125, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0125
          Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0125
          2009-08-25 12:52:40,140 map = 0%, reduce = 0%
          2009-08-25 12:52:44,165 map = 50%, reduce = 50%
          2009-08-25 12:52:45,176 map = 100%, reduce = 100%
          Ended Job = job_200908201702_0125
          OK
          1
          1
          1
          Time taken: 7.275 seconds
          }}

          Show
          Luis Ramos added a comment - Edward, How do I set the logging level? I have "hive.root.logger=DEBUG,DRFA" on hive-log4j.properties. I followed the following, and I also change the SQL {{ hive> add jar /usr/share/java//usr/share/java/mysql-connector-java-5.1.5.jar; Added /usr/share/java//usr/share/java/mysql-connector-java-5.1.5.jar to class path hive> add jar build/contrib/hive_contrib.jar; Added build/contrib/hive_contrib.jar to class path hive> CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput'; OK Time taken: 0.247 seconds hive> SELECT dboutput('jdbc:mysql://localhost:3306/analytics','root','mxplay','CREATE TABLE app_info (kkey VARCHAR(255), vvalue VARCHAR(255))') FROM web_logs; Total MapReduce jobs = 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_200908201702_0125, Tracking URL = http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0125 Kill Command = /home/hadoop/hadoop-0.18.3/bin/../bin/hadoop job -Dmapred.job.tracker=verizonmaster:50121 -kill job_200908201702_0125 2009-08-25 12:52:40,140 map = 0%, reduce = 0% 2009-08-25 12:52:44,165 map = 50%, reduce = 50% 2009-08-25 12:52:45,176 map = 100%, reduce = 100% Ended Job = job_200908201702_0125 OK 1 1 1 Time taken: 7.275 seconds }}
          Hide
          Edward Capriolo added a comment -

          Luis,

          The logs will not emit to the console unless you have 'set mapred.job.tracker=local' I think. You can do that ... or....

          You should probably look at the job/task logs from the task tracker. http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0125

          Or last ditch you can rebuild the UDF with system.out system.err the output of that is definitely written to the task log.

          Also make sure you have speculative execution off.

          Show
          Edward Capriolo added a comment - Luis, The logs will not emit to the console unless you have 'set mapred.job.tracker=local' I think. You can do that ... or.... You should probably look at the job/task logs from the task tracker. http://verizonmaster:50030/jobdetails.jsp?jobid=job_200908201702_0125 Or last ditch you can rebuild the UDF with system.out system.err the output of that is definitely written to the task log. Also make sure you have speculative execution off.
          Hide
          Luis Ramos added a comment -

          Edward,

          Got it, thank you so much for your help. Navigating the logs in debug mode helped. I think that Speculative Execution change to false on hadoop helped as well. Will run more test and post any unexpected results.

          Thank you.

          Show
          Luis Ramos added a comment - Edward, Got it, thank you so much for your help. Navigating the logs in debug mode helped. I think that Speculative Execution change to false on hadoop helped as well. Will run more test and post any unexpected results. Thank you.
          Hide
          Edward Capriolo added a comment -

          I made a small update here to help with debugging. The UDF returns 2 if connection manager fails (missing driver,bad credentials). The UDF returns 1 if the query fails (duplicate key, bad SQL, etc). Does anyone else have any blocker for going +1?

          Show
          Edward Capriolo added a comment - I made a small update here to help with debugging. The UDF returns 2 if connection manager fails (missing driver,bad credentials). The UDF returns 1 if the query fails (duplicate key, bad SQL, etc). Does anyone else have any blocker for going +1?
          Hide
          Namit Jain added a comment -

          +1

          looks good - will commit if the tests pass

          Show
          Namit Jain added a comment - +1 looks good - will commit if the tests pass
          Hide
          Namit Jain added a comment -

          Committed. Thanks Edward

          Show
          Namit Jain added a comment - Committed. Thanks Edward
          Hide
          Luis Ramos added a comment -

          Great, its been working with no issues so far.

          Show
          Luis Ramos added a comment - Great, its been working with no issues so far.
          Hide
          Luis Ramos added a comment -

          What exactly does "mapred.job.tracker=local" do? and how will this affect my master/slave nodes to mapreduce a job and this UDF? I'm seeing some interesting things with dboutput() when I set or unset the job.tracker. Thank you.

          Show
          Luis Ramos added a comment - What exactly does "mapred.job.tracker=local" do? and how will this affect my master/slave nodes to mapreduce a job and this UDF? I'm seeing some interesting things with dboutput() when I set or unset the job.tracker. Thank you.
          Hide
          Edward Capriolo added a comment -

          Luis,

          With any hadoop process setting "mapred.job.tracker=local" runs the map/and reduce locally in process. So it is not distributed 1 thread for maps,one thread for reduce. The reason you see more output is because normally the output map and reduce tasks run on separate nodes. With "mapred.job.tracker=local" you get all the debug.

          Show
          Edward Capriolo added a comment - Luis, With any hadoop process setting "mapred.job.tracker=local" runs the map/and reduce locally in process. So it is not distributed 1 thread for maps,one thread for reduce. The reason you see more output is because normally the output map and reduce tasks run on separate nodes. With "mapred.job.tracker=local" you get all the debug.
          Hide
          Luis Ramos added a comment -

          Sometimes dboutput() UDF fails if I don't set "mapred.job.tracker" to "local", what could be the problem? I have Hive and Hadoop 0.18.3 on the Master, and just Hadoop 0.18.3 on the Slave. I followed the configuration on Multi-Node Clustering for Ubuntu. I have some ideas on what could be the issue, but would like to compare reasoning. Thanks.

          Show
          Luis Ramos added a comment - Sometimes dboutput() UDF fails if I don't set "mapred.job.tracker" to "local", what could be the problem? I have Hive and Hadoop 0.18.3 on the Master, and just Hadoop 0.18.3 on the Slave. I followed the configuration on Multi-Node Clustering for Ubuntu. I have some ideas on what could be the issue, but would like to compare reasoning. Thanks.
          Hide
          Edward Capriolo added a comment -

          Luis,

          I can take a shutgun effect to your problem and guess what the issues could be.

          1) Network Access, Do all your nodes have mysql access to the destination server?

          2) Too many simultaneous connections. If your mapred.map.tasks or mapred.reduce.tasks is very high, you might be flooding your myself server. You might start with mapred.map.tasks=1 and mapred.map.tasks=1 and work you way up to higher numbers and see if that is the issue.

          Can you produce steps to reproduce the issue.

          Show
          Edward Capriolo added a comment - Luis, I can take a shutgun effect to your problem and guess what the issues could be. 1) Network Access, Do all your nodes have mysql access to the destination server? 2) Too many simultaneous connections. If your mapred.map.tasks or mapred.reduce.tasks is very high, you might be flooding your myself server. You might start with mapred.map.tasks=1 and mapred.map.tasks=1 and work you way up to higher numbers and see if that is the issue. Can you produce steps to reproduce the issue.
          Hide
          Luis Ramos added a comment -

          Edward,

          1) I didn't think you needed mysql on the other nodes, but I installed it and I'm able to connect from all my nodes to my destination server (mysql -u dbuser -h master -p). Do I need the jdbc connector jar on hadoop? Or do I need to have hive on my nodes as well?

          2) Changed mapred.map.task and mapred.reduce.task to 1 on all my nodes but I have the same issues.

          What I did to compensate for this: I only call dboutput on a hive table that looks exactly like I want mysql to look, so SELECT * FROM table. Anytime a query needs to map/reduce I get connection issues (error:2). If I change back to mapred.job.tracker=local it works.

          Show
          Luis Ramos added a comment - Edward, 1) I didn't think you needed mysql on the other nodes, but I installed it and I'm able to connect from all my nodes to my destination server (mysql -u dbuser -h master -p). Do I need the jdbc connector jar on hadoop? Or do I need to have hive on my nodes as well? 2) Changed mapred.map.task and mapred.reduce.task to 1 on all my nodes but I have the same issues. What I did to compensate for this: I only call dboutput on a hive table that looks exactly like I want mysql to look, so SELECT * FROM table. Anytime a query needs to map/reduce I get connection issues (error:2). If I change back to mapred.job.tracker=local it works.
          Hide
          Edward Capriolo added a comment -

          Luis,

          That is quite a strange error. I obviously have had it working on tables without exact schema matching even in distributed mode. Are you running the latest trunk?

          Maybe someone wants to pitch in here. Is their any possible scope issues that may be happening? We are declaring connection as member variable. It could be defined local to the evaluate. Could that be a corner case that only shows up in distributed testing?

          Connection connection = null;
          
          Show
          Edward Capriolo added a comment - Luis, That is quite a strange error. I obviously have had it working on tables without exact schema matching even in distributed mode. Are you running the latest trunk? Maybe someone wants to pitch in here. Is their any possible scope issues that may be happening? We are declaring connection as member variable. It could be defined local to the evaluate. Could that be a corner case that only shows up in distributed testing? Connection connection = null;
          Hide
          Luis Ramos added a comment -

          Finally...it's alive! It was a JAR issue with mysql-connector, I was compiling both 5.1.5(ubuntu) and 5.1.8(download) and loading 5.1.5 into hive (which its why it only worked on local)... apparently 5.1.5 doesn't work, when I loaded 5.1.8 everything works as expected. Thanks for the help. I noticed I had both jars in my library so I only kept 5.1.5 and not even local worked, which is how I noticed.

          Show
          Luis Ramos added a comment - Finally...it's alive! It was a JAR issue with mysql-connector, I was compiling both 5.1.5(ubuntu) and 5.1.8(download) and loading 5.1.5 into hive (which its why it only worked on local)... apparently 5.1.5 doesn't work, when I loaded 5.1.8 everything works as expected. Thanks for the help. I noticed I had both jars in my library so I only kept 5.1.5 and not even local worked, which is how I noticed.

            People

            • Assignee:
              Edward Capriolo
              Reporter:
              Edward Capriolo
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development