From 69155e45afdbcad1f35146b9267dc17e2eae2044 Mon Sep 17 00:00:00 2001 From: ted malaska Date: Mon, 12 Oct 2015 11:23:16 -0400 Subject: [PATCH] HBASE-14158 Add documentation for Initial Release for HBase-Spark Module integration --- src/main/asciidoc/_chapters/spark.adoc | 243 +++++++++++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 src/main/asciidoc/_chapters/spark.adoc diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc new file mode 100644 index 0000000..1aec788 --- /dev/null +++ b/src/main/asciidoc/_chapters/spark.adoc @@ -0,0 +1,243 @@ +//// +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//// + +[[spark]] += HBase and Spark +:doctype: book +:numbered: +:toc: left +:icons: font +:experimental: + +Apache Spark is a software framework that is used to process data in memory in a distributed manner, and is replacing MapReduce in many use cases. http://spark.apache.org/[Apache Spark] Spark itself is out of scope of this document, please refer to the Spark site for more information on the Spark project and subprojects. This document will focus on 4 main interaction points between Spark and HBase. Those interaction points are: 1.Basic Spark: The ability to have a HBase Connection at any point in your Spark DAG. 2.Spark Streaming: The ability to have a HBase Connection at any point in your Spark Streaming application. 3.Spark Bulk Load: The ability to write directly to HBase HFiles for bulk insertion into HBase 4.SparkSQL/DataFrames: The ability to write SparkSQL that draws on tables that are represented in HBase. The following sections will walk through examples of all the interaction points' just listed above. + +== Basic Spark +Here we will talk about Spark HBase integration at the lowest and simplest levels. All the other interaction points are built upon the concepts that will be described here. At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext takes in HBase configurations and pushes them to the Spark executors. This allows us to have an HBase Connection per Spark Executor in a static location. Just for reference Spark Executors can be on the same nodes as the Region Servers or on different nodes there is no dependence of co-location. Think of every Spark Executor as a multi-threaded client application. This allows any Spark Tasks running on the executors to access the shared Connection object. Here is a simple example of how the HBaseContext can be used. In this example we are doing a foreachPartition on a RDD in Scala. ---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +... + +val hbaseContext = new HBaseContext(sc, config) + +rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { + val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) + it.foreach((putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + bufferedMutator.mutate(put) + }) + bufferedMutator.flush() + bufferedMutator.close() +}) +---- + +If Java is perferred instead of Scala it will look a little different but still vary possible as we can see with this example. + +---- +JavaSparkContext jsc = new JavaSparkContext(sparkConf); + +try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + ... + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.foreachPartition(rdd, + new VoidFunction, Connection>>() { + public void call(Tuple2, Connection> t) + throws Exception { + Table table = t._2().getTable(TableName.valueOf(tableName)); + BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); + while (t._1().hasNext()) { + byte[] b = t._1().next(); + Result r = table.get(new Get(b)); + if (r.getExists()) { + mutator.mutate(new Put(b)); + } + } + + mutator.flush(); + mutator.close(); + table.close(); + } + }); +} finally { + jsc.stop(); +} +---- All functionality between Spark and HBase will be supported both in Scala and in Java, with the exception of SparkSQL which will support any language that is supported by Spark. For the remaining of this documentation we will focus on Scala examples for now. Now the examples above illustrate how to do a foreachPartition with a connection. There are a number of other Spark base functions that are supported out of the box: 1.BulkPut: For massively parallel sending of puts to HBase 2.BulkDelete: For massively parallel sending of deletes to HBase 3.BulkGet: For massively parallel sending of gets to HBase to create a new RDD 4.MapPartition: To do a Spark Map function with a Connection object to allow full access to HBase 5.HBaseRDD: To simplify a distributed scan to create a RDD Examples of all these functionalities can be found in the HBase-Spark Module. == Spark Streaming +http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream processing framework built on top of Spark. HBase and Spark Streaming make great companies in that HBase can help serve the following benefits alongside Spark Streaming. 1.A place to grab reference data or profile data on the fly 2.A place to store counts or aggregates in a way that supports Spark Streaming promise of only once processing. The HBase-Spark module’s integration points with Spark Streaming are very similar to its normal Spark integration points. In that the following commands are possible straight off a Spark Streaming DStream. 1.BulkPut: For massively parallel sending of puts to HBase 2.BulkDelete: For massively parallel sending of deletes to HBase 3.BulkGet: For massively parallel sending of gets to HBase to create a new RDD 4.ForeachPartition: To do a Spark Foreach function with a Connection object to allow full access to HBase 5.MapPartitions: To do a Spark Map function with a Connection object to allow full access to HBase Below is an example of bulkPut with DStreams, as you will see it is very close in feel as the RDD bulk put. + ---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) +val ssc = new StreamingContext(sc, Milliseconds(200)) + +val rdd1 = ... +val rdd2 = ... + +val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], + Array[Byte], Array[Byte])])]]() + +queue += rdd1 +queue += rdd2 + +val dStream = ssc.queueStream(queue) + +dStream.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) ---- + +There are three inputs to the hbaseBulkPut function. +1. The hbaseContext that carries the configuration boardcast information link us to the HBase Connections in the executors +2. The table name of the table we are putting data into +3. A function that will convert a record in the DStream into a HBase Put object. + +== Bulk Load +Spark bulk load follows very closely to the MapReduce implementation of bulk load. In short there is a partitioner that partitions based on region splits and the row keys are sent to the reducers in order so that HFiles can be written out. In Spark terms the bulk load will be focused around a repartitionAndSortWithinPartitions followed by a foreachPartition. The only major difference with the Spark implementation compared to the MapReduce implementation is that the column qualifier is included in the shuffle ordering process. This was done because the MapReduce bulk load implementation would have memory issues with loading rows with a large numbers of columns. This memory issue was a result of the sorting of those columns being done in the memory of the reducer JVM. Now that ordering is done in the Spark Shuffle there should no longer be a limit to the number of columns in a row for bulk loading. Below is simple code of what bulk loading with Spark would look like + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- + +You will see in this example the hbaseBulkLoad function will take three parameters. + +1. The table name of the table we intend to bulk load too +2. A function that will convert a record in the RDD to a tuple key value par. With the tuple key being a KeyFamilyQualifer object and the value being the cell value. The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. The shuffle will partition on the RowKey but will sort by all three values. +3. The temporary path for the HFile to be written out too + +Then following the Spark bulk load command we need to use the HBase's LoadIncrementalHFiles object to load the newly created HFiles into HBase. + +Now there are advance options for bulk load with Spark. We can set the following attributes with addition parameter options on hbaseBulkLoad. +1. Max file size of the HFiles +2. A flag to exclude HFiles from compactions +3. Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding + +Below is a code example of bulk load with these extra parameters + +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... + +val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] +val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") + +familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) + +rdd.hbaseBulkLoad(TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath, + familyHBaseWriterOptions, + compactionExclude = false, + HConstants.DEFAULT_MAX_FILE_SIZE) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- + + +== SparkSQL/DataFrames http://spark.apache.org/sql/[SparkSQL] is a sub project of Spark that support SQL that will compute down to a Spark DAG. In addition SparkSQL is a heavy user of DataFrames. Think of DataFrames as RDDs with schema information. The HBase-Spark module has support for Spark SQL and DataFrames. Allowing the user to write SparkSQL directly on HBase tables. In addition the HBase-Spark will push down query filtering logic down to HBase. === Predicate Push Down There are two examples of predicate push down in the HBase-Spark implementation. First the example is the push down of filtering logic on the RowKey. HBase-Spark will reduce the filters on RowKeys down to a set of Get and/or Scan commands. Note the Scans are distributed scans not a single client scan operation. If the query looks something like the following, the logic will push down and get the rows through 3 Gets and 0 Scans. We can do gets because all the operations are equal operations. ---- SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3') ---- Now lets look at an example where we will end up doing two scans on HBase. ---- SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTmp WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3' ---- In this example we will get 0 Gets and 2 Scans. One scan will load everything from the first row in the table until “get2” and the second scan will get everything from “get3” until the last row in the table. + +Now the next query is a good example of having a good deal of range checks. However the ranges overlap. To the code will be smart enough to get the following data in a single scan that encompasses all the data asked by the query. +---- +SELECT + KEY_FIELD, + B_FIELD, + A_FIELD +FROM hbaseTmp +WHERE + (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or + (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5') +---- + +The second example of push down functionality offered by the HBase-Spark module is the ability to push down filter logic for column and cell fields. Just like the RowKey logic, all query logic will be consolidated into the minimum number of range checks and equal checks. This is implemented by sending a Filter object along with the Scan with information about consolidated push down predicates + +=== SparkSQL Code Example +The following is an example in Scala of how we can interact with HBase with SQL. + ---- val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() new HBaseContext(sc, TEST_UTIL.getConfiguration) val sqlContext = new SQLContext(sc) df = sqlContext.load("org.apache.hadoop.hbase.spark", Map("hbase.columns.mapping" -> "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b", "hbase.table" -> "t1")) df.registerTempTable("hbaseTmp") val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " + "WHERE " + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) ---- + +There are three major parts of this example that deserve explaining. First is the sqlContext.load function. In the sqlContext.load function we see two parameters. The first of these parameters is pointing Spark to the HBase DefaultSource class that will act as the interface between SparkSQL and HBase. + +The second parameter is a map of key value pairs. In this example we have two keys in our map, they are hbase.columns.mapping and hbase.table. The hbase.table key will point SparkSQL to use the given HBase table. The hbase.columns.mapping key give us the logic to translate HBase columns to SparkSQL columns. + +The hbase.columns.mapping is a string that follows the following format + +---- +(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier) +---- + +In the example below we see the definition of three fields. Note that KEY_FIELD has no ColumnFamily and therefore it is the RowKey. + +---- +KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b +---- + +The next major thing to note in the example is the registerTempTable function. This is a SparkSQL function that allows us now to be free of Scala when accessing our HBase table directly with SQL with the table name of "hbaseTmp". + +The last major point to note in the example is the sqlContext.sql function. This allows the user to ask their questions in SQL which will be pushed down to the DefaultSource code in the HBase-Spark module. The result of this command will be a DataFrame with the Schema of KEY_FIELD and B_FIELD. + \ No newline at end of file -- 2.3.2 (Apple Git-55)