diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala new file mode 100644 index 0000000..7dbe140 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import java.io.Serializable + +/** + * This object will hold optional data for how a given column family's + * writer will work + * + * @param compression String to define the Compression to be used in the HFile + * @param bloomType String to define the bloom type to be used in the HFile + * @param blockSize The block size to be used in the HFile + * @param dataBlockEncoding String to define the data block encoding to be used + * in the HFile + */ +class FamilyHFileWriteOptions( val compression:String, + val bloomType: String, + val blockSize: Int, + val dataBlockEncoding: String) extends Serializable diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index f060fea..d128f1b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -17,31 +17,35 @@ package org.apache.hadoop.hbase.spark -import org.apache.hadoop.hbase.TableName +import java.net.InetSocketAddress +import java.util +import java.util.Comparator + +import org.apache.hadoop.hbase.fs.HFileSystem +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.io.compress.Compression +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl} +import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType} +import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.ConnectionFactory -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.client.Get -import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.client._ import scala.reflect.ClassTag -import org.apache.hadoop.hbase.client.Connection -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.hbase.client.Delete -import org.apache.spark.{Logging, SerializableWritable, SparkContext} -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.spark.{Partitioner, Logging, SerializableWritable, SparkContext} +import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, TableInputFormat, IdentityTableMapper} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.hbase.client.Mutation import org.apache.spark.streaming.dstream.DStream import java.io._ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.hbase.mapreduce.TableInputFormat -import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper import org.apache.hadoop.fs.{Path, FileSystem} +import scala.collection.mutable /** * HBaseContext is a façade for HBase operations @@ -567,4 +571,284 @@ class HBaseContext(@transient sc: SparkContext, */ private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + /** + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param rdd The RDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param flatMap A flapMap function that will make every row in the RDD + * into N cells for the bulk load + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is writen + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + * @tparam T The Type of values in the original RDD + */ + def bulkLoad[T](rdd:RDD[T], + tableName: TableName, + flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): + Unit = { + val conn = ConnectionFactory.createConnection(config) + val regionLocator = conn.getRegionLocator(tableName) + val startKeys = regionLocator.getStartKeys + val regionCount = startKeys.length + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr) + val now = System.currentTimeMillis() + val tableNameByteArray = tableName.getName + + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) + } + + /** + * This will return a new HFile writer when requested + * + * @param family column family + * @param conf configuration to connect to HBase + * @param favoredNodes nodes that we would like to write too + * @param fs FileSystem object where we will be writing the HFiles to + * @return WriterLength object + */ + def getNewWriter(family: Array[Byte], conf: Configuration, + favoredNodes: Array[InetSocketAddress], + fs:FileSystem, + familydir:Path): WriterLength = { + + + var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) + + if (familyOptions == null) { + familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, + BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) + } + + val tempConf = new Configuration(conf) + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + val contextBuilder = new HFileContextBuilder() + .withCompression(Algorithm.valueOf(familyOptions.compression)) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(familyOptions.blockSize) + contextBuilder.withDataBlockEncoding(DataBlockEncoding. + valueOf(familyOptions.dataBlockEncoding)) + val hFileContext = contextBuilder.build() + + if (null == favoredNodes) { + new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) + } else { + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build()) + } + } + + // Custom partitioner that will partition based on region splits + val regionSplitPartitioner = new Partitioner { + override def numPartitions: Int = regionCount + + override def getPartition(key: Any): Int = { + + val rowKey:Array[Byte] = + if (key.isInstanceOf[KeyFamilyQualifier]) { + key.asInstanceOf[(KeyFamilyQualifier, Array[Byte])]._1.rowKey + } else { + key.asInstanceOf[Array[Byte]] + } + + val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { + override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { + Bytes.compareTo(o1, o2) + } + } + util.Arrays.binarySearch(startKeys, rowKey, comparator) + 1 + } + } + + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.flatMap( r => flatMap(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { + + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + + /** + * This will roll all writers + */ + def rollWriters(): Unit = { + writerMap.values.foreach( wl => { + if (wl.writer != null) { + logInfo("Writer=" + wl.writer.getPath + + (if (wl.written == 0) "" else ", wrote=" + wl.written)) + close(wl.writer) + } + }) + writerMap.clear() + rollOverRequested = false + } + + /** + * This function will close a given HFile writer + * @param w The writer to close + */ + def close(w:StoreFile.Writer): Unit = { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())) + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)) + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)) + w.appendTrackedTimestampsToMetadata() + w.close() + } + } + + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => + + //This will get a writer for the column family + //If there is no writer for a given column family then + //it will get created here. + val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), { + + val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family)) + + fs.mkdirs(familyDir) + + val loc:HRegionLocation = { + try { + val locator = + conn.getRegionLocator(TableName.valueOf(tableNameByteArray)) + locator.getRegionLocation(keyFamilyQualifier.rowKey) + } catch { + case e: Throwable => + logWarning("there's something wrong when locating rowkey: " + + Bytes.toString(keyFamilyQualifier.rowKey)) + null + } + } + if (null == loc) { + if (log.isTraceEnabled) { + logTrace("failed to get region location, so use default writer: " + + Bytes.toString(keyFamilyQualifier.rowKey)) + } + getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null, + fs = fs, familydir = familyDir) + } else { + if (log.isDebugEnabled) { + logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]") + } + val initialIsa = + new InetSocketAddress(loc.getHostname, loc.getPort) + if (initialIsa.isUnresolved) { + if (log.isTraceEnabled) { + logTrace("failed to resolve bind address: " + loc.getHostname + ":" + + loc.getPort + ", so use default writer") + } + getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir) + } else { + if(log.isDebugEnabled) { + logDebug("use favored nodes writer: " + initialIsa.getHostString) + } + getNewWriter(keyFamilyQualifier.family, conf, + Array[InetSocketAddress](initialIsa), fs, familyDir) + } + } + }) + + val keyValue =new KeyValue(keyFamilyQualifier.rowKey, + keyFamilyQualifier.family, + keyFamilyQualifier.qualifier, + now,cellValue) + + wl.writer.append(keyValue) + wl.written += keyValue.getLength + + rollOverRequested = rollOverRequested || wl.written > maxSize + + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { + rollWriters() + } + + previousRow = keyFamilyQualifier.rowKey + } + //We have finished all the data so lets close up the writers + rollWriters() + }) + } + + /** + * This is a wrapper class around StoreFile.Writer. The reason for the + * wrapper is to keep the length of the file along side the writer + * + * @param written The writer to be wrapped + * @param writer The number of bytes written to the writer + */ + class WriterLength(var written:Long, val writer:StoreFile.Writer) + + class ByteArrayWrapper (val o1:Array[Byte]) + extends Comparable[ByteArrayWrapper] with Serializable { + override def compareTo(o2: ByteArrayWrapper): Int = { + Bytes.compareTo(o1,o2.o1) + } + override def equals(o2: Any): Boolean = { + o2 match { + case wrapper: ByteArrayWrapper => + Bytes.equals(o1, wrapper.o1) + case _ => + false + } + } + override def hashCode():Int = { + Bytes.hashCode(o1) + } + } + } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala index fb8456d..42bce40 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -17,11 +17,15 @@ package org.apache.hadoop.hbase.spark -import org.apache.hadoop.hbase.TableName +import java.util + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.{HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD +import scala.collection.immutable.HashMap import scala.reflect.ClassTag /** @@ -158,5 +162,44 @@ object HBaseRDDFunctions RDD[R] = { hc.mapPartitions[T,R](rdd, f) } + + /** + * Implicit method that gives easy access to HBaseContext's + * bulkLoad method. + * + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param tableName + * @param flapMap + * @param stagingDir + * @param familyHFileWriteOptionsMap + * @param compactionExclude + * @param maxSize + */ + def hbaseBulkLoad(hc: HBaseContext, + tableName: TableName, + flapMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { + hc.bulkLoad(rdd, tableName, + flapMap, stagingDir, familyHFileWriteOptionsMap, + compactionExclude, maxSize) + } } } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala new file mode 100644 index 0000000..b2eda2c --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import java.io.Serializable + +import org.apache.hadoop.hbase.util.Bytes + +/** + * This is the key to be used for sorting and shuffling. + * + * We will only partition on the rowKey but we will sort on all three + * + * @param rowKey Record RowKey + * @param family Record ColumnFamily + * @param qualifier Cell Qualifier + */ +class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte]) + extends Comparable[KeyFamilyQualifier] with Serializable { + override def compareTo(o: KeyFamilyQualifier): Int = { + var result = Bytes.compareTo(rowKey, o.rowKey) + if (result == 0) { + result = Bytes.compareTo(family, o.family) + if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier) + } + result + } + override def toString: String = { + Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + } +} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala new file mode 100644 index 0000000..11fcd2e --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hbase.client.{Get, ConnectionFactory} +import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile} +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.spark.{SparkContext, Logging} +import org.junit.rules.TemporaryFolder +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +class BulkLoadSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily1 = "f1" + val columnFamily2 = "f2" + val testFolder = new TemporaryFolder() + + + override def beforeAll() { + TEST_UTIL.startMiniCluster() + logInfo(" - minicluster started") + + try { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + } catch { + case e: Exception => + logInfo(" - no table " + tableName + " found") + } + + logInfo(" - created table") + + val envMap = Map[String,String](("Xmx", "512m")) + + sc = new SparkContext("local", "test", null, Nil, envMap) + } + + override def afterAll() { + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + logInfo(" - minicluster shut down") + TEST_UTIL.cleanupTestDir() + sc.stop() + } + + test("bulkLoad to test HBase client: Basic Test multi family " + + "and multi column tests with all default HFile Configs") { + val config = TEST_UTIL.getConfiguration + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2))) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2) + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table, + conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b")) + + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + + } + } + + test("bulkLoad to test HBase client: Test Roll Over") { + val config = TEST_UTIL.getConfiguration + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2))) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + rdd.hbaseBulkLoad(hbaseContext, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath, + new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude = false, + 20) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1) + + assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5) + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c")) + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + } + } + + test("bulkLoad to test HBase client: Basic Test multi family and multi column tests" + + " with one column family with custom configs") { + val config = TEST_UTIL.getConfiguration + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2))) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] + + val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, + "PREFIX") + + familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options) + + hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + val family:Array[Byte] = t._2(0)._1 + val qualifier = t._2(0)._2 + val value = t._2(0)._3 + + val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) + + Seq((keyFamilyQualifier, value)).iterator + }, + stagingFolder.getPath, + familyHBaseWriterOptions, + compactionExclude = false, + HConstants.DEFAULT_MAX_FILE_SIZE) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2) + + val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1")) + for ( i <- 0 until f1FileList.length) { + val reader = HFile.createReader(fs, f1FileList(i).getPath, + new CacheConfig(config), config) + assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getDataBlockEncoding.name().equals("PREFIX")) + } + + val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2")) + for ( i <- 0 until f2FileList.length) { + val reader = HFile.createReader(fs, f2FileList(i).getPath, + new CacheConfig(config), config) + assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getDataBlockEncoding.name().equals("NONE")) + } + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b")) + + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + + } + } + +} diff --git a/src/main/asciidoc/_chapters/images b/src/main/asciidoc/_chapters/images deleted file mode 120000 index 1e0c6c1..0000000 --- a/src/main/asciidoc/_chapters/images +++ /dev/null @@ -1 +0,0 @@ -../../site/resources/images \ No newline at end of file diff --git a/src/main/asciidoc/images b/src/main/asciidoc/images deleted file mode 120000 index 06d04d0..0000000 --- a/src/main/asciidoc/images +++ /dev/null @@ -1 +0,0 @@ -../site/resources/images \ No newline at end of file