Hive
  1. Hive
  2. HIVE-4196

Support for Streaming Partitions in Hive

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.10.1
    • Fix Version/s: None
    • Component/s: Database/Schema, HCatalog
    • Labels:
      None
    • Release Note:
       
    • Tags:
      Streaming HCatalog

      Description

      Motivation: Allow Hive users to immediately query data streaming in through clients such as Flume.

      Currently Hive partitions must be created after all the data for the partition is available. Thereafter, data in the partitions is considered immutable.

      This proposal introduces the notion of a streaming partition into which new files an be committed periodically and made available for queries before the partition is closed and converted into a standard partition.

      The admin enables streaming partition on a table using DDL. He provides the following pieces of information:

      • Name of the partition in the table on which streaming is enabled
      • Frequency at which the streaming partition should be closed and converted into a standard partition.

      Tables with streaming partition enabled will be partitioned by one and only one column. It is assumed that this column will contain a timestamp.

      Closing the current streaming partition converts it into a standard partition. Based on the specified frequency, the current streaming partition is closed and a new one created for future writes. This is referred to as 'rolling the partition'.

      A streaming partition's life cycle is as follows:

      • A new streaming partition is instantiated for writes
      • Streaming clients request (via webhcat) for a HDFS file name into which they can write a chunk of records for a specific table.
      • Streaming clients write a chunk (via webhdfs) to that file and commit it(via webhcat). Committing merely indicates that the chunk has been written completely and ready for serving queries.
      • When the partition is rolled, all committed chunks are swept into single directory and a standard partition pointing to that directory is created. The streaming partition is closed and new streaming partition is created. Rolling the partition is atomic. Streaming clients are agnostic of partition rolling.
      • Hive queries will be able to query the partition that is currently open for streaming. only committed chunks will be visible. read consistency will be ensured so that repeated reads of the same partition will be idempotent for the lifespan of the query.

      Partition rolling requires an active agent/thread running to check when it is time to roll and trigger the roll. This could be either be achieved by using an external agent such as Oozie (preferably) or an internal agent.

        Issue Links

          Activity

          Hide
          Brock Noland added a comment -

          Hi Roshan,

          Looks like a good proposal and a great place for Flume to integrate with Hive! In the proposal how come we have the clients using webhdfs to write a chunk of data? Couldn't the client user any HDFS api?

          Brock

          Show
          Brock Noland added a comment - Hi Roshan, Looks like a good proposal and a great place for Flume to integrate with Hive! In the proposal how come we have the clients using webhdfs to write a chunk of data? Couldn't the client user any HDFS api? Brock
          Hide
          Alan Gates added a comment -

          The client could certainly use any HDFS API to talk with HDFS. The webhcat API will return a URL that works with webhdfs, but nothing prevents a user from stripping the http://.../webhdfs/v1 stuff off and invoking HDFS directly. We're optimizing assuming a REST user, but not excluding others.

          Show
          Alan Gates added a comment - The client could certainly use any HDFS API to talk with HDFS. The webhcat API will return a URL that works with webhdfs, but nothing prevents a user from stripping the http://.../webhdfs/v1 stuff off and invoking HDFS directly. We're optimizing assuming a REST user, but not excluding others.
          Hide
          eric baldeschwieler added a comment -

          Maybe we should just return both?

          Show
          eric baldeschwieler added a comment - Maybe we should just return both?
          Hide
          Roshan Naik added a comment -

          Adding phase 1 design document to solicit feedback.

          Show
          Roshan Naik added a comment - Adding phase 1 design document to solicit feedback.
          Hide
          Roshan Naik added a comment -

          Updating design doc.

          Show
          Roshan Naik added a comment - Updating design doc.
          Hide
          Roshan Naik added a comment -

          draft patch for review. based on phase mentioned in design doc. Deviates slighlty...
          1) adds a couple of (temporary) rest calls to enable/disable streaming on a table. Later these will be replaced with support in DDL.

          2) Also also HTTP methods are GET for easy testing with web browser

          3) Authentication disabled on the new streaming HTTP methods

          Usage Examples on db named 'sdb' & table named 'log' :

          1) Setup db & table with single partition column 'date':
          hcat -e "create database sdb; use sdb; create table log(msg string, region string) partitioned by (date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; "

          2) To check streaming status:
          http://localhost:50111/templeton/v1/streaming/status?database=sdb&table=log

          3) Enable Streaming:
          http://localhost:50111/templeton/v1/streaming/enable?database=sdb&table=log&col=date&value=1000

          4) Get Chunk File to write to:
          http://localhost:50111/templeton/v1/streaming/chunkget?database=sdb&table=log&schema=blah&format=blah&record_separator=blah&field_separator=blah

          5) Commit Chunk File:
          http://localhost:50111/templeton/v1/streaming/chunkcommit?database=sdb&table=log&chunkfile=/user/hive/streaming/tmp/sdb/log/2

          6) Abort Chunk File:
          http://localhost:50111/templeton/v1/streaming/chunkabort?database=sdb&table=log&chunkfile=/user/hive/streaming/tmp/sdb/log/3

          7) Roll Partition:
          http://localhost:50111/templeton/v1/streaming/partitionroll?database=sdb&table=log&partition_column=date&partition_value=3000

          Show
          Roshan Naik added a comment - draft patch for review. based on phase mentioned in design doc. Deviates slighlty... 1) adds a couple of (temporary) rest calls to enable/disable streaming on a table. Later these will be replaced with support in DDL. 2) Also also HTTP methods are GET for easy testing with web browser 3) Authentication disabled on the new streaming HTTP methods Usage Examples on db named 'sdb' & table named 'log' : 1) Setup db & table with single partition column 'date': hcat -e "create database sdb; use sdb; create table log(msg string, region string) partitioned by (date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; " 2) To check streaming status: http://localhost:50111/templeton/v1/streaming/status?database=sdb&table=log 3) Enable Streaming: http://localhost:50111/templeton/v1/streaming/enable?database=sdb&table=log&col=date&value=1000 4) Get Chunk File to write to: http://localhost:50111/templeton/v1/streaming/chunkget?database=sdb&table=log&schema=blah&format=blah&record_separator=blah&field_separator=blah 5) Commit Chunk File: http://localhost:50111/templeton/v1/streaming/chunkcommit?database=sdb&table=log&chunkfile=/user/hive/streaming/tmp/sdb/log/2 6) Abort Chunk File: http://localhost:50111/templeton/v1/streaming/chunkabort?database=sdb&table=log&chunkfile=/user/hive/streaming/tmp/sdb/log/3 7) Roll Partition: http://localhost:50111/templeton/v1/streaming/partitionroll?database=sdb&table=log&partition_column=date&partition_value=3000
          Hide
          Alan Gates added a comment -

          Roshan,

          Could you post a pdf version of the doc so users without MS Word can read it? Also, could you post a version of the patch without the thrift generated code (anything under src/gen or src-gen) so it's easier for the reviewers to determine what to review?

          Show
          Alan Gates added a comment - Roshan, Could you post a pdf version of the doc so users without MS Word can read it? Also, could you post a version of the patch without the thrift generated code (anything under src/gen or src-gen) so it's easier for the reviewers to determine what to review?
          Hide
          Roshan Naik added a comment -

          pdf version of design & spec doc

          Show
          Roshan Naik added a comment - pdf version of design & spec doc
          Hide
          Alan Gates added a comment -

          Some comments:

          According to the Hive coding conventions lines should be bounded at 100 characters. Many lines in this patch exceed that.

          In ObjectStore.java:

          • I'm surprised to see that streamingStatus sets the chunk id for the table. This seems to be a status call. Why should it be setting chunk id?
          • The logic at the end of of these functions doesn't look right. Take getNextChunkID for example. If commitTransaction fails (line 2132) rollback will be called but the next chunk id will still be returned. It seems you need a check on success after commit. I realize many of the calls in the class follow this, but it doesn't seem right.

          In HiveMetaStoreClient.java, is assert what you want? Are you ok with the validity of the arguments not being checked most of the time?

          I'm trying to figure out whether the chunk files are moved, deleted, or left alone during the partition rolling. From examining the code and playing with Hive it looks like the files will be left alone. But have you tested this?

          Which leads to, I don't see any tests in this patch. This code needs a lot of tests.

          Show
          Alan Gates added a comment - Some comments: According to the Hive coding conventions lines should be bounded at 100 characters. Many lines in this patch exceed that. In ObjectStore.java: I'm surprised to see that streamingStatus sets the chunk id for the table. This seems to be a status call. Why should it be setting chunk id? The logic at the end of of these functions doesn't look right. Take getNextChunkID for example. If commitTransaction fails (line 2132) rollback will be called but the next chunk id will still be returned. It seems you need a check on success after commit. I realize many of the calls in the class follow this, but it doesn't seem right. In HiveMetaStoreClient.java, is assert what you want? Are you ok with the validity of the arguments not being checked most of the time? I'm trying to figure out whether the chunk files are moved, deleted, or left alone during the partition rolling. From examining the code and playing with Hive it looks like the files will be left alone. But have you tested this? Which leads to, I don't see any tests in this patch. This code needs a lot of tests.
          Hide
          Roshan Naik added a comment -

          According to the Hive coding conventions lines should be bounded at 100 characters. Many lines in this patch exceed that.

          Will fix the ones which are not in the thrift generated files.

          I'm surprised to see that streamingStatus sets the chunk id for the table.

          Seems like a bug. Will fix.

          The logic at the end of of these functions doesn't look right. Take getNextChunkID for example. If commitTransaction fails (line 2132) rollback will be called but the next chunk id will still be returned. It seems you need a check on success after commit. I realize many of the calls in the class follow this, but it doesn't seem right.

          Good catch. At the time I thought commitTxn() will only fail with an exception & does not return false. But on closer inspection there is indeed a corner case (if rollBack was called) that it returns false also. Its a bizzare thing for a function to fail with & without exceptions. But for now I will fix my code to live with it.

          In HiveMetaStoreClient.java, is assert what you want? Are you ok with the validity of the arguments not being checked most of the time?

          Not all checks are in place. There is some checks that will happen at lower layers. Some at higher. Will be adding more checks.

          I'm trying to figure out whether the chunk files are moved, deleted, or left alone during the partition rolling.

          That would depend on whether the table is defined to be an external or internal table. It is essentially an add_partition of the new partition. It calls HiveMetastore.add_partition_core_notxn() inside a transaction.

          Show
          Roshan Naik added a comment - According to the Hive coding conventions lines should be bounded at 100 characters. Many lines in this patch exceed that. Will fix the ones which are not in the thrift generated files. I'm surprised to see that streamingStatus sets the chunk id for the table. Seems like a bug. Will fix. The logic at the end of of these functions doesn't look right. Take getNextChunkID for example. If commitTransaction fails (line 2132) rollback will be called but the next chunk id will still be returned. It seems you need a check on success after commit. I realize many of the calls in the class follow this, but it doesn't seem right. Good catch. At the time I thought commitTxn() will only fail with an exception & does not return false. But on closer inspection there is indeed a corner case (if rollBack was called) that it returns false also. Its a bizzare thing for a function to fail with & without exceptions. But for now I will fix my code to live with it. In HiveMetaStoreClient.java, is assert what you want? Are you ok with the validity of the arguments not being checked most of the time? Not all checks are in place. There is some checks that will happen at lower layers. Some at higher. Will be adding more checks. I'm trying to figure out whether the chunk files are moved, deleted, or left alone during the partition rolling. That would depend on whether the table is defined to be an external or internal table. It is essentially an add_partition of the new partition. It calls HiveMetastore.add_partition_core_notxn() inside a transaction.
          Hide
          Ashutosh Chauhan added a comment -

          Few high level comments:

          • We should try to eliminate the need of intermediate staging area while rolling on new partitions. Seems like there should not be any gotchas while moving data from streaming dir to partition dir directly.
          • We should make thrift apis in metastore forward compatible. One way to do that is to use struct (which contains all parameters) instead of passing in list of arguments.
          • We should try to leave TBLS table untouched in backend db. That will simplify upgrade story. One way to do that is to have all new columns in a new table and than add constraints for this new table.
          Show
          Ashutosh Chauhan added a comment - Few high level comments: We should try to eliminate the need of intermediate staging area while rolling on new partitions. Seems like there should not be any gotchas while moving data from streaming dir to partition dir directly. We should make thrift apis in metastore forward compatible. One way to do that is to use struct (which contains all parameters) instead of passing in list of arguments. We should try to leave TBLS table untouched in backend db. That will simplify upgrade story. One way to do that is to have all new columns in a new table and than add constraints for this new table.
          Hide
          Roshan Naik added a comment -

          Thanks Ashutosh. Since your recommendations apply to subtask HIVE-5138, I have copied ur comments over to it. I will address them there.

          Show
          Roshan Naik added a comment - Thanks Ashutosh. Since your recommendations apply to subtask HIVE-5138 , I have copied ur comments over to it. I will address them there.
          Hide
          Roshan Naik added a comment -

          In view of the HIVE-5317 which brings in insert/update/delete support to Hive, the need for introducing streaming partitions is no longer necessary. Streaming support can be provided with a far less complexity by leveraging HIVE-5317

          Show
          Roshan Naik added a comment - In view of the HIVE-5317 which brings in insert/update/delete support to Hive, the need for introducing streaming partitions is no longer necessary. Streaming support can be provided with a far less complexity by leveraging HIVE-5317
          Hide
          Roshan Naik added a comment -

          Moving the streaming work to a new jira HIVE-5687 since it will be based on a different design.

          Show
          Roshan Naik added a comment - Moving the streaming work to a new jira HIVE-5687 since it will be based on a different design.

            People

            • Assignee:
              Roshan Naik
              Reporter:
              Roshan Naik
            • Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development