Spark
  1. Spark
  2. SPARK-2447

Add common solution for sending upsert actions to HBase (put, deletes, and increment)

    Details

    • Type: New Feature New Feature
    • Status: In Progress
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Spark Core, Streaming
    • Labels:
      None

      Description

      Going to review the design with Tdas today.

      But first thoughts is to have an extension of VoidFunction that handles the connection to HBase and allows for options such as turning auto flush off for higher through put.

      Need to answer the following questions first.

      • Can it be written in Java or should it be written in Scala?
      • What is the best way to add the HBase dependency? (will review how Flume does this as the first option)
      • What is the best way to do testing? (will review how Flume does this as the first option)
      • How to support python? (python may be a different Jira it is unknown at this time)

      Goals:

      • Simple to use
      • Stable
      • Supports high load
      • Documented (May be in a separate Jira need to ask Tdas)
      • Supports Java, Scala, and hopefully Python
      • Supports Streaming and normal Spark

        Issue Links

          Activity

          Hide
          Ted Malaska added a comment -

          tdas when you have time can you assign to me. Thanks

          Show
          Ted Malaska added a comment - tdas when you have time can you assign to me. Thanks
          Hide
          Ted Malaska added a comment -

          OK here is the first beta batch of code. It is not ready for a pull request because I need to add unit tests. I will get those in the coming days.

          But I have tested all the examples on CDH 5.0.2 and found no issues.

          https://github.com/tmalaska/SparkOnHBase

          Show
          Ted Malaska added a comment - OK here is the first beta batch of code. It is not ready for a pull request because I need to add unit tests. I will get those in the coming days. But I have tested all the examples on CDH 5.0.2 and found no issues. https://github.com/tmalaska/SparkOnHBase
          Hide
          Ted Malaska added a comment -

          Code review on Thur July 17.

          At least 14 action items before the next review
          1. Convert var to val
          2. Rename bulkGets to bulkGet and repeat for others
          3. rename this the private map method to mapPartition
          4. add commits for every method
          5. Indenting isn't correct for all lines
          6. Close all hTable (I forgot one)
          7. Unit tests for everything
          8. Change the sending of Configuration to be broadcast so reduce IO to the workers and reduce the start up time
          9. Store HConnection in a static place so that all partitions on a worker does have to create a HConnection
          10. Map of Connections (we need to support being about to connect to more then one connection)
          11. BulkGet needs to comment about red in and out
          12. SparkContext should be given to the HBaseContext constructer
          13. remove default constructor
          14. Use Seralizable writable in Spark (HadoopRDD as an Example)

          Show
          Ted Malaska added a comment - Code review on Thur July 17. At least 14 action items before the next review 1. Convert var to val 2. Rename bulkGets to bulkGet and repeat for others 3. rename this the private map method to mapPartition 4. add commits for every method 5. Indenting isn't correct for all lines 6. Close all hTable (I forgot one) 7. Unit tests for everything 8. Change the sending of Configuration to be broadcast so reduce IO to the workers and reduce the start up time 9. Store HConnection in a static place so that all partitions on a worker does have to create a HConnection 10. Map of Connections (we need to support being about to connect to more then one connection) 11. BulkGet needs to comment about red in and out 12. SparkContext should be given to the HBaseContext constructer 13. remove default constructor 14. Use Seralizable writable in Spark (HadoopRDD as an Example)
          Hide
          Ted Malaska added a comment -

          Over the weekend I got the following done:
          1. Converted most vars to vals
          2. Rename bulkGets to bulkGet and repeat for others
          3. rename this the private map method to mapPartition
          5. Indenting isn't correct for all lines
          6. Close all hTable (I forgot one)
          8. Change the sending of Configuration to be broadcast so reduce IO to the workers and reduce the start up time
          9. Store HConnection in a static place so that all partitions on a worker does have to create a HConnection
          10. Map of Connections (we need to support being about to connect to more then one connection)
          11. BulkGet needs to comment about red in and out
          12. SparkContext should be given to the HBaseContext constructer
          13. remove default constructor
          14. Use Seralizable writable in Spark (HadoopRDD as an Example)

          Extra:
          1. Finished first cut of design doc https://github.com/tmalaska/SparkOnHBase/blob/master/SparkOnHBase.Design.Doc.docx
          2. Built support for spark streaming
          3. Built put spark streaming example

          Show
          Ted Malaska added a comment - Over the weekend I got the following done: 1. Converted most vars to vals 2. Rename bulkGets to bulkGet and repeat for others 3. rename this the private map method to mapPartition 5. Indenting isn't correct for all lines 6. Close all hTable (I forgot one) 8. Change the sending of Configuration to be broadcast so reduce IO to the workers and reduce the start up time 9. Store HConnection in a static place so that all partitions on a worker does have to create a HConnection 10. Map of Connections (we need to support being about to connect to more then one connection) 11. BulkGet needs to comment about red in and out 12. SparkContext should be given to the HBaseContext constructer 13. remove default constructor 14. Use Seralizable writable in Spark (HadoopRDD as an Example) Extra: 1. Finished first cut of design doc https://github.com/tmalaska/SparkOnHBase/blob/master/SparkOnHBase.Design.Doc.docx 2. Built support for spark streaming 3. Built put spark streaming example
          Hide
          Ted Malaska added a comment -

          Added JavaDoc and rename method changes

          This is the list of tasks that I have to do next:
          1. unit testing
          2. Logging
          3. change pom and packages to fit into the external folder of Spark

          https://github.com/tmalaska/SparkOnHBase

          Show
          Ted Malaska added a comment - Added JavaDoc and rename method changes This is the list of tasks that I have to do next: 1. unit testing 2. Logging 3. change pom and packages to fit into the external folder of Spark https://github.com/tmalaska/SparkOnHBase
          Hide
          Ted Malaska added a comment -

          Getting closer to the first pull request.

          1. Added Apache heads
          2. Added unit test for all Spark RDD functions
          3. Updated the packages

          Things to do
          1. There was some major class path issues when adding the HBaseTestingUtility class. Need to clean up the POM
          2. Need to convert the POM to not use CDH dependancies
          3. Need to add unit tests for Spark Streaming
          4. Need to add unit tests for HConnectionStatisCache

          Latest code and documentation
          https://github.com/tmalaska/SparkOnHBase
          https://github.com/tmalaska/SparkOnHBase/blob/master/SparkOnHBase.Design.Doc.docx

          Show
          Ted Malaska added a comment - Getting closer to the first pull request. 1. Added Apache heads 2. Added unit test for all Spark RDD functions 3. Updated the packages Things to do 1. There was some major class path issues when adding the HBaseTestingUtility class. Need to clean up the POM 2. Need to convert the POM to not use CDH dependancies 3. Need to add unit tests for Spark Streaming 4. Need to add unit tests for HConnectionStatisCache Latest code and documentation https://github.com/tmalaska/SparkOnHBase https://github.com/tmalaska/SparkOnHBase/blob/master/SparkOnHBase.Design.Doc.docx
          Hide
          Ted Malaska added a comment -

          Added first of many pull request. Please feel free to review.

          https://github.com/apache/spark/pull/1608

          This is the first pull request: mainly to test the review process, but there are still a number of things that I plan to add this week.

          1. Clean up the pom file
          2. Add unit tests for the HConnectionStaticCache

          If I have time I will also add the following:
          1. Support for Java
          2. Additional unit tests for Java
          3. Additional unit tests for Spark Streaming

          Show
          Ted Malaska added a comment - Added first of many pull request. Please feel free to review. https://github.com/apache/spark/pull/1608 This is the first pull request: mainly to test the review process, but there are still a number of things that I plan to add this week. 1. Clean up the pom file 2. Add unit tests for the HConnectionStaticCache If I have time I will also add the following: 1. Support for Java 2. Additional unit tests for Java 3. Additional unit tests for Spark Streaming
          Hide
          Ted Malaska added a comment -

          Making good progress. Just FYI it may take a little longer because the version of HBase in Spark is 94.1 which has a couple different APIs.

          Show
          Ted Malaska added a comment - Making good progress. Just FYI it may take a little longer because the version of HBase in Spark is 94.1 which has a couple different APIs.
          Hide
          Ted Malaska added a comment -

          So Spark has HBase 94.6 as the default HBase. That API is very different from later versions.

          So question do we still want to code for 0.94.6?

          I will get that working and update the pull request to work for 0.94.6 but on most distributions today we will be using deprecated methods.

          Show
          Ted Malaska added a comment - So Spark has HBase 94.6 as the default HBase. That API is very different from later versions. So question do we still want to code for 0.94.6? I will get that working and update the pull request to work for 0.94.6 but on most distributions today we will be using deprecated methods.
          Hide
          Ted Malaska added a comment -

          Just talked to JMS from HBase and we don't want to go down this road. The APIs for 94.6 wont work with the newer versions. I'm looking into upping the version of HBase in Spark now.

          Show
          Ted Malaska added a comment - Just talked to JMS from HBase and we don't want to go down this road. The APIs for 94.6 wont work with the newer versions. I'm looking into upping the version of HBase in Spark now.
          Hide
          Ted Malaska added a comment -

          The build is fixed and the pull request is updated

          Show
          Ted Malaska added a comment - The build is fixed and the pull request is updated
          Hide
          Ted Malaska added a comment -

          OK had a status meeting with TD.

          1. 2447 will be pushed past 1.1
          2. Focus on these tasks
          2.1. Java
          2.2. More unit testing
          2.3. Partitioned Put
          2.4. Partitioned Sorted Get
          2.5. BulkCheckPut
          2.6. BulkLoad

          Show
          Ted Malaska added a comment - OK had a status meeting with TD. 1. 2447 will be pushed past 1.1 2. Focus on these tasks 2.1. Java 2.2. More unit testing 2.3. Partitioned Put 2.4. Partitioned Sorted Get 2.5. BulkCheckPut 2.6. BulkLoad
          Hide
          Matei Zaharia added a comment -

          Hey Ted, thanks for putting this together. Apart from Java, can you look into supporting Python too? It seems that the main datatypes in HBase are arrays of bytes, so we can at least expose those in Python.

          Similarly, it might be good to make the Scala and Java API return just those, and let the user convert them after. In general for data sources like this we'd like to keep the code as simple and low-level as possible, so that it has a high chance of continuing to work with future versions of the data source (specifically future versions of HBase here).

          Finally, I'm curious how stable the HBase APIs used here are. What is the lowest version of HBase we can support with this, and are they all promised to be in future versions?

          Show
          Matei Zaharia added a comment - Hey Ted, thanks for putting this together. Apart from Java, can you look into supporting Python too? It seems that the main datatypes in HBase are arrays of bytes, so we can at least expose those in Python. Similarly, it might be good to make the Scala and Java API return just those, and let the user convert them after. In general for data sources like this we'd like to keep the code as simple and low-level as possible, so that it has a high chance of continuing to work with future versions of the data source (specifically future versions of HBase here). Finally, I'm curious how stable the HBase APIs used here are. What is the lowest version of HBase we can support with this, and are they all promised to be in future versions?
          Hide
          Ted Malaska added a comment -

          Hey Matei,

          Lets do a webex or something in the near future. I would love to get more of your input.

          Here are my answers to you questions above:
          1. Yes I can do Python
          2. Yes I can do that. So to be clear the bulkGet and scan will return a fixed (Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte], Long)]) for (rowKey, Array[columnFamily, column, value, timestamp)])
          2.1 As for the bulkPut/Increment/Delete/CheckPut I think we need to give the user freedom to interact with the raw API. I have no problem building a simpler interface for the 80% use case but I don't want to fail the 20%.
          3. The lowest version is 0.96 The release is there was a major API change from 0.94 to 0.96+. So if we need to support 0.94 and below we need to make a different code base.

          Let me know if this answers you questions and let me know if there is anything else I can do. I have learned so much from TD and I have grown so much from this process.

          Ted Malaska

          Show
          Ted Malaska added a comment - Hey Matei, Lets do a webex or something in the near future. I would love to get more of your input. Here are my answers to you questions above: 1. Yes I can do Python 2. Yes I can do that. So to be clear the bulkGet and scan will return a fixed (Array [Byte] , Array[(Array [Byte] , Array [Byte] , Array [Byte] , Long)]) for (rowKey, Array [columnFamily, column, value, timestamp)] ) 2.1 As for the bulkPut/Increment/Delete/CheckPut I think we need to give the user freedom to interact with the raw API. I have no problem building a simpler interface for the 80% use case but I don't want to fail the 20%. 3. The lowest version is 0.96 The release is there was a major API change from 0.94 to 0.96+. So if we need to support 0.94 and below we need to make a different code base. Let me know if this answers you questions and let me know if there is anything else I can do. I have learned so much from TD and I have grown so much from this process. Ted Malaska
          Hide
          Patrick Wendell added a comment -

          This is not entirely a duplicate, but it's similar to SPARK-1127

          Show
          Patrick Wendell added a comment - This is not entirely a duplicate, but it's similar to SPARK-1127
          Hide
          Tathagata Das added a comment - - edited

          I took a brief look at SPARK-1127 as well. I think both the PRs have their merits. We should consider consolidating the functionalities that they provide.

          The relevant PR is this https://github.com/apache/spark/pull/194/files

          Ted Malaska can you take a look at this PR as well. I think the saveAsHBaseFile is a simpler interface that may be worth supporting if there is enough use of this simple interface (which assumes that all rows have same column structure).

          Show
          Tathagata Das added a comment - - edited I took a brief look at SPARK-1127 as well. I think both the PRs have their merits. We should consider consolidating the functionalities that they provide. The relevant PR is this https://github.com/apache/spark/pull/194/files Ted Malaska can you take a look at this PR as well. I think the saveAsHBaseFile is a simpler interface that may be worth supporting if there is enough use of this simple interface (which assumes that all rows have same column structure).
          Hide
          Ted Malaska added a comment -

          Tell me if I'm wrong but the core offering of 1127 is also provided with 2447.

          All I would have to do is provide the rdd functions to call the bulkPut or future bulkPartitionPut

          Show
          Ted Malaska added a comment - Tell me if I'm wrong but the core offering of 1127 is also provided with 2447. All I would have to do is provide the rdd functions to call the bulkPut or future bulkPartitionPut
          Hide
          Tathagata Das added a comment -

          Exactly!! That's why I feel that both have its merits, 2447 provides lower-level, all-inclusive interfaces using which slightly advanced users can do arbitrary stuff with. But it requires programming against HBase types like Put, and all. However, 1127 provides the simple interface which allows not-so-advanced users to do a set of simple stuff without requiring too much HBase knowledge. They are complimentary, and the latter should be implemented on top of the former.

          Show
          Tathagata Das added a comment - Exactly!! That's why I feel that both have its merits, 2447 provides lower-level, all-inclusive interfaces using which slightly advanced users can do arbitrary stuff with. But it requires programming against HBase types like Put, and all. However, 1127 provides the simple interface which allows not-so-advanced users to do a set of simple stuff without requiring too much HBase knowledge. They are complimentary, and the latter should be implemented on top of the former.
          Hide
          Norman He added a comment -

          HI Ted,

          I am very glad to see the hbase RDD work. I am probably going to use it in current form.

          I like the idea of worker node to manage HBaseConnection, Somehow I havenot seen anycode related to HConnectionStaticCache?

          Show
          Norman He added a comment - HI Ted, I am very glad to see the hbase RDD work. I am probably going to use it in current form. I like the idea of worker node to manage HBaseConnection, Somehow I havenot seen anycode related to HConnectionStaticCache?
          Hide
          Ted Malaska added a comment -

          Hey Norman,

          Yes the github project has been used by a couple of client now. It should be pretty harden. Let me know if you find any issues.

          I will hopefully run into TD and Hadoop World and I will work out how to get this into Spark.

          Thanks for the comment.

          Show
          Ted Malaska added a comment - Hey Norman, Yes the github project has been used by a couple of client now. It should be pretty harden. Let me know if you find any issues. I will hopefully run into TD and Hadoop World and I will work out how to get this into Spark. Thanks for the comment.
          Hide
          Norman He added a comment -

          Hi Ted and Tathagaat Das,

          Would the spark/sparkstreaming consider making HBaseContext an facade to encolse all the HBase simple Get /Put methods?

          Show
          Norman He added a comment - Hi Ted and Tathagaat Das, Would the spark/sparkstreaming consider making HBaseContext an facade to encolse all the HBase simple Get /Put methods?
          Hide
          Ted Malaska added a comment -

          Hey Norman,

          Totally agree. TD and I talked about SparkOnHBase at Hadoop World. Times where crazy leading up to Hadoop World.

          So I'm doing the following things:
          1. I'm writing up a Blog for SparkOnHBase
          2. TD is working on directions for how this code should be integrated with Spark
          3. I have been working out little bugs with Java integration
          4. I want to build a couple more examples
          5. I'm having a problem with Maven where the Java JUnits are not executing
          6. I adding support for Kerberos

          But yes the facade is coming.

          Let me know if you want to help. Just do a pull request on https://github.com/tmalaska/SparkOnHBase

          Show
          Ted Malaska added a comment - Hey Norman, Totally agree. TD and I talked about SparkOnHBase at Hadoop World. Times where crazy leading up to Hadoop World. So I'm doing the following things: 1. I'm writing up a Blog for SparkOnHBase 2. TD is working on directions for how this code should be integrated with Spark 3. I have been working out little bugs with Java integration 4. I want to build a couple more examples 5. I'm having a problem with Maven where the Java JUnits are not executing 6. I adding support for Kerberos But yes the facade is coming. Let me know if you want to help. Just do a pull request on https://github.com/tmalaska/SparkOnHBase
          Hide
          Norman He added a comment -

          Yes, I would like to help. Let me start with facade work in scala first.

          Show
          Norman He added a comment - Yes, I would like to help. Let me start with facade work in scala first.
          Hide
          Ted Malaska added a comment -

          Cool thanks.

          Connect me at ted.malaska@cloudera.com and I'll set up a webex for some time in the future so we can get this going.

          Thanks

          Show
          Ted Malaska added a comment - Cool thanks. Connect me at ted.malaska@cloudera.com and I'll set up a webex for some time in the future so we can get this going. Thanks
          Hide
          Patrick Wendell added a comment -

          Hey All,

          I have a question about this - is there any reason this can't exist as a user library instead of being merged into Spark itself? For these utility libraries like this, I could see ones coming for Cassandra, Mongo, etc... I don't see it scaling to put and maintain all of these in the Spark code base. At the same time however, they are super useful.

          As an alternative - what about if it was in HBase similar to e.g. the Hadoop InputFormat implementation?

          Show
          Patrick Wendell added a comment - Hey All, I have a question about this - is there any reason this can't exist as a user library instead of being merged into Spark itself? For these utility libraries like this, I could see ones coming for Cassandra, Mongo, etc... I don't see it scaling to put and maintain all of these in the Spark code base. At the same time however, they are super useful. As an alternative - what about if it was in HBase similar to e.g. the Hadoop InputFormat implementation?
          Hide
          Ted Malaska added a comment -

          I totally understand. I also don't know the answer.

          That is why I made the Github. Thankfully my employer also sees value in this project and it will be moving to Cloudera Labs in the coming weeks. All that means is I will have more help supporting it.

          A blog and more improvement will be coming in the coming weeks.

          Show
          Ted Malaska added a comment - I totally understand. I also don't know the answer. That is why I made the Github. Thankfully my employer also sees value in this project and it will be moving to Cloudera Labs in the coming weeks. All that means is I will have more help supporting it. A blog and more improvement will be coming in the coming weeks.
          Hide
          Norman He added a comment -

          Hi Ted,

          I have already made some changes in scala for facading and added some tests. Let us discuss early next week. How should I send you the code reviews?

          -Norman

          Show
          Norman He added a comment - Hi Ted, I have already made some changes in scala for facading and added some tests. Let us discuss early next week. How should I send you the code reviews? -Norman
          Hide
          Ted Malaska added a comment -

          Hey guy,

          Just wanted to update this jira. In summery the Spark committers is still deciding how this will be or not be include in the external part of Spark.

          For now because the demand is there and because the solution works I'm going to host the solution on Cloudera Labs.
          Here is the blog post that walks through the solution.

          http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/

          Show
          Ted Malaska added a comment - Hey guy, Just wanted to update this jira. In summery the Spark committers is still deciding how this will be or not be include in the external part of Spark. For now because the demand is there and because the solution works I'm going to host the solution on Cloudera Labs. Here is the blog post that walks through the solution. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/

            People

            • Assignee:
              Ted Malaska
              Reporter:
              Ted Malaska
            • Votes:
              1 Vote for this issue
              Watchers:
              17 Start watching this issue

              Dates

              • Created:
                Updated:

                Development