diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala index c51a3af..39d3403 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala @@ -36,19 +36,21 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) override def getPartition(key: Any): Int = { + val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { + override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { + Bytes.compareTo(o1, o2) + } + } + val rowKey:Array[Byte] = key match { case qualifier: KeyFamilyQualifier => qualifier.rowKey + case wrapper: ByteArrayWrapper => + wrapper.value case _ => key.asInstanceOf[Array[Byte]] } - - val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { - override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { - Bytes.compareTo(o1, o2) - } - } val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) if (partition < 0) partition * -1 + -2 else partition diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala new file mode 100644 index 0000000..9167f75 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import java.io.Serializable + +import org.apache.hadoop.hbase.util.Bytes + +/** + * This is a wrapper over a byte array so it can work as + * a key in a hashMap + * + * @param value The Byte Array value + */ +class ByteArrayWrapper (var value:Array[Byte]) + extends Comparable[ByteArrayWrapper] with Serializable { + override def compareTo(valueOther: ByteArrayWrapper): Int = { + Bytes.compareTo(value,valueOther.value) + } + override def equals(o2: Any): Boolean = { + o2 match { + case wrapper: ByteArrayWrapper => + Bytes.equals(value, wrapper.value) + case _ => + false + } + } + override def hashCode():Int = { + Bytes.hashCode(value) + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala new file mode 100644 index 0000000..33b3609 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import java.util + +/** + * This object is a clean way to store and sort all cells that will be bulk + * loaded into a single row + */ +class FamiliesQualifiersValues extends Serializable { + //Tree maps are used because we need the results to + // be sorted when we read them + val familyMap = new util.TreeMap[ByteArrayWrapper, + util.TreeMap[ByteArrayWrapper, Array[Byte]]]() + + //normally in a row there are more columns then + //column families this wrapper is reused for column + //family look ups + val reusableWrapper = new ByteArrayWrapper(null) + + /** + * Adds a new cell to an existing row + * @param family HBase column family + * @param qualifier HBase column qualifier + * @param value HBase cell value + */ + def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = { + + reusableWrapper.value = family + + var qualifierValues = familyMap.get(reusableWrapper) + + if (qualifierValues == null) { + qualifierValues = new util.TreeMap[ByteArrayWrapper, Array[Byte]]() + familyMap.put(new ByteArrayWrapper(family), qualifierValues) + } + + qualifierValues.put(new ByteArrayWrapper(qualifier), value) + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 57ae6b0..f8c24f2 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark import java.net.InetSocketAddress import java.util +import javax.management.openmbean.KeyAlreadyExistsException import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase._ @@ -575,7 +576,8 @@ class HBaseContext(@transient sc: SparkContext, def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] /** - * A Spark Implementation of HBase Bulk load + * Spark Implementation of HBase Bulk load for wide rows or when + * values are not already combined at the time of the map process * * This will take the content from an existing RDD then sort and shuffle * it with respect to region splits. The result of that sort and shuffle @@ -616,10 +618,10 @@ class HBaseContext(@transient sc: SparkContext, val startKeys = regionLocator.getStartKeys val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) - val defaultCompression = HFileWriterImpl + val hfileCompression = HFileWriterImpl .compressionByName(defaultCompressionStr) - val now = System.currentTimeMillis() - val tableNameByteArray = tableName.getName + val nowTimeStamp = System.currentTimeMillis() + val tableRawName = tableName.getName val familyHFileWriteOptionsMapInternal = new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] @@ -631,53 +633,6 @@ class HBaseContext(@transient sc: SparkContext, familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) } - /** - * This will return a new HFile writer when requested - * - * @param family column family - * @param conf configuration to connect to HBase - * @param favoredNodes nodes that we would like to write too - * @param fs FileSystem object where we will be writing the HFiles to - * @return WriterLength object - */ - def getNewWriter(family: Array[Byte], conf: Configuration, - favoredNodes: Array[InetSocketAddress], - fs:FileSystem, - familydir:Path): WriterLength = { - - - var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) - - if (familyOptions == null) { - familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, - BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) - familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) - } - - val tempConf = new Configuration(conf) - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) - val contextBuilder = new HFileContextBuilder() - .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) - .withBlockSize(familyOptions.blockSize) - contextBuilder.withDataBlockEncoding(DataBlockEncoding. - valueOf(familyOptions.dataBlockEncoding)) - val hFileContext = contextBuilder.build() - - if (null == favoredNodes) { - new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) - } else { - new WriterLength(0, - new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build()) - } - } - val regionSplitPartitioner = new BulkLoadPartitioner(startKeys) @@ -695,150 +650,422 @@ class HBaseContext(@transient sc: SparkContext, val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY var rollOverRequested = false - - /** - * This will roll all writers - */ - def rollWriters(): Unit = { - writerMap.values.foreach( wl => { - if (wl.writer != null) { - logDebug("Writer=" + wl.writer.getPath + - (if (wl.written == 0) "" else ", wrote=" + wl.written)) - close(wl.writer) - } - }) - writerMap.clear() - rollOverRequested = false - } - - /** - * This function will close a given HFile writer - * @param w The writer to close - */ - def close(w:StoreFile.Writer): Unit = { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())) - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)) - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)) - w.appendTrackedTimestampsToMetadata() - w.close() - } - } + val localTableName = TableName.valueOf(tableRawName) //Here is where we finally iterate through the data in this partition of the //RDD that has been sorted and partitioned it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => - //This will get a writer for the column family - //If there is no writer for a given column family then - //it will get created here. - val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), { - - val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family)) - - fs.mkdirs(familyDir) - - val loc:HRegionLocation = { - try { - val locator = - conn.getRegionLocator(TableName.valueOf(tableNameByteArray)) - locator.getRegionLocation(keyFamilyQualifier.rowKey) - } catch { - case e: Throwable => - logWarning("there's something wrong when locating rowkey: " + - Bytes.toString(keyFamilyQualifier.rowKey)) - null - } - } - if (null == loc) { - if (log.isTraceEnabled) { - logTrace("failed to get region location, so use default writer: " + - Bytes.toString(keyFamilyQualifier.rowKey)) - } - getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null, - fs = fs, familydir = familyDir) - } else { - if (log.isDebugEnabled) { - logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]") - } - val initialIsa = - new InetSocketAddress(loc.getHostname, loc.getPort) - if (initialIsa.isUnresolved) { - if (log.isTraceEnabled) { - logTrace("failed to resolve bind address: " + loc.getHostname + ":" - + loc.getPort + ", so use default writer") - } - getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir) - } else { - if(log.isDebugEnabled) { - logDebug("use favored nodes writer: " + initialIsa.getHostString) - } - getNewWriter(keyFamilyQualifier.family, conf, - Array[InetSocketAddress](initialIsa), fs, familyDir) - } - } - }) - - val keyValue =new KeyValue(keyFamilyQualifier.rowKey, + val wl = writeValueToHFile(keyFamilyQualifier.rowKey, keyFamilyQualifier.family, keyFamilyQualifier.qualifier, - now,cellValue) - - wl.writer.append(keyValue) - wl.written += keyValue.getLength + cellValue, + nowTimeStamp, + fs, + conn, + localTableName, + conf, + familyHFileWriteOptionsMapInternal, + hfileCompression, + writerMap, + stagingDir) rollOverRequested = rollOverRequested || wl.written > maxSize //This will only roll if we have at least one column family file that is //bigger then maxSize and we have finished a given row key if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { - rollWriters() + rollWriters(writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false } previousRow = keyFamilyQualifier.rowKey } //We have finished all the data so lets close up the writers - rollWriters() + rollWriters(writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false }) } /** - * This is a wrapper class around StoreFile.Writer. The reason for the - * wrapper is to keep the length of the file along side the writer + * Spark Implementation of HBase Bulk load for short rows some where less then + * a 1000 columns. This bulk load should be faster for tables will thinner + * rows then the other spark implementation of bulk load that puts only one + * value into a record going into a shuffle * - * @param written The writer to be wrapped - * @param writer The number of bytes written to the writer + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * In this implementation, only the rowKey is given to the shuffle as the key + * and all the columns are already linked to the RowKey before the shuffle + * stage. The sorting of the qualifier is done in memory out side of the + * shuffle stage + * + * Also make sure that incoming RDDs only have one record for every row key. + * + * @param rdd The RDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param mapFunction A function that will convert the RDD records to + * the key value format used for the shuffle to prep + * for writing to the bulk loaded HFiles + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + * @tparam T The Type of values in the original RDD */ - class WriterLength(var written:Long, val writer:StoreFile.Writer) + def bulkLoadThinRows[T](rdd:RDD[T], + tableName: TableName, + mapFunction: (T) => + (ByteArrayWrapper, FamiliesQualifiersValues), + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): + Unit = { + val conn = ConnectionFactory.createConnection(config) + val regionLocator = conn.getRegionLocator(tableName) + val startKeys = regionLocator.getStartKeys + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr) + val nowTimeStamp = System.currentTimeMillis() + val tableRawName = tableName.getName + + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) + } + + val regionSplitPartitioner = + new BulkLoadPartitioner(startKeys) + + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.map( r => mapFunction(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { + + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + val localTableName = TableName.valueOf(tableRawName) + + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (rowKey:ByteArrayWrapper, + familiesQualifiersValues:FamiliesQualifiersValues) => + + + if (Bytes.compareTo(previousRow, rowKey.value) == 0) { + throw new KeyAlreadyExistsException("The following key was sent to the " + + "HFile load more then one: " + Bytes.toString(previousRow)) + } + + //The family map is a tree map so the families will be sorted + val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator() + while (familyIt.hasNext) { + val familyEntry = familyIt.next() + + val family = familyEntry.getKey.value + + val qualifierIt = familyEntry.getValue.entrySet().iterator() + + //The qualifier map is a tree map so the families will be sorted + while (qualifierIt.hasNext) { + + val qualifierEntry = qualifierIt.next() + val qualifier = qualifierEntry.getKey + val cellValue = qualifierEntry.getValue + + writeValueToHFile(rowKey.value, + family, + qualifier.value, // qualifier + cellValue, // value + nowTimeStamp, + fs, + conn, + localTableName, + conf, + familyHFileWriteOptionsMapInternal, + defaultCompression, + writerMap, + stagingDir) + + previousRow = rowKey.value + } + + writerMap.values.foreach( wl => { + rollOverRequested = rollOverRequested || wl.written > maxSize + + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested) { + rollWriters(writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false + } + }) + } + } + + //This will get a writer for the column family + //If there is no writer for a given column family then + //it will get created here. + //We have finished all the data so lets close up the writers + rollWriters(writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false + }) + } /** - * This is a wrapper over a byte array so it can work as - * a key in a hashMap + * This will return a new HFile writer when requested * - * @param o1 The Byte Array value + * @param family column family + * @param conf configuration to connect to HBase + * @param favoredNodes nodes that we would like to write too + * @param fs FileSystem object where we will be writing the HFiles to + * @return WriterLength object */ - class ByteArrayWrapper (val o1:Array[Byte]) - extends Comparable[ByteArrayWrapper] with Serializable { - override def compareTo(o2: ByteArrayWrapper): Int = { - Bytes.compareTo(o1,o2.o1) + private def getNewHFileWriter(family: Array[Byte], conf: Configuration, + favoredNodes: Array[InetSocketAddress], + fs:FileSystem, + familydir:Path, + familyHFileWriteOptionsMapInternal: + util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions], + defaultCompression:Compression.Algorithm): WriterLength = { + + + var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) + + if (familyOptions == null) { + familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, + BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) } - override def equals(o2: Any): Boolean = { - o2 match { - case wrapper: ByteArrayWrapper => - Bytes.equals(o1, wrapper.o1) - case _ => - false - } + + val tempConf = new Configuration(conf) + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + val contextBuilder = new HFileContextBuilder() + .withCompression(Algorithm.valueOf(familyOptions.compression)) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(familyOptions.blockSize) + contextBuilder.withDataBlockEncoding(DataBlockEncoding. + valueOf(familyOptions.dataBlockEncoding)) + val hFileContext = contextBuilder.build() + + if (null == favoredNodes) { + new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) + } else { + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build()) } - override def hashCode():Int = { - Bytes.hashCode(o1) + } + + /** + * Encompasses the logic to write a value to an HFile + * + * @param rowKey The RowKey for the record + * @param family HBase column family for the record + * @param qualifier HBase column qualifier for the record + * @param cellValue HBase cell value + * @param nowTimeStamp The cell time stamp + * @param fs Connection to the FileSystem for the HFile + * @param conn Connection to HBaes + * @param tableName HBase TableName object + * @param conf Configuration to be used when making a new HFile + * @param familyHFileWriteOptionsMapInternal Extra configs for the HFile + * @param hfileCompression The compression codec for the new HFile + * @param writerMap HashMap of existing writers and their offsets + * @param stagingDir The staging directory on the FileSystem to store + * the HFiles + * @return The writer for the given HFile that was writen + * too + */ + private def writeValueToHFile(rowKey: Array[Byte], + family: Array[Byte], + qualifier: Array[Byte], + cellValue:Array[Byte], + nowTimeStamp: Long, + fs: FileSystem, + conn: Connection, + tableName: TableName, + conf: Configuration, + familyHFileWriteOptionsMapInternal: + util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions], + hfileCompression:Compression.Algorithm, + writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], + stagingDir: String + ): WriterLength = { + + val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), { + val familyDir = new Path(stagingDir, Bytes.toString(family)) + + fs.mkdirs(familyDir) + + val loc:HRegionLocation = { + try { + val locator = + conn.getRegionLocator(tableName) + locator.getRegionLocation(rowKey) + } catch { + case e: Throwable => + logWarning("there's something wrong when locating rowkey: " + + Bytes.toString(rowKey)) + null + } + } + if (null == loc) { + if (log.isTraceEnabled) { + logTrace("failed to get region location, so use default writer: " + + Bytes.toString(rowKey)) + } + getNewHFileWriter(family = family, + conf = conf, + favoredNodes = null, + fs = fs, + familydir = familyDir, + familyHFileWriteOptionsMapInternal, + hfileCompression) + } else { + if (log.isDebugEnabled) { + logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]") + } + val initialIsa = + new InetSocketAddress(loc.getHostname, loc.getPort) + if (initialIsa.isUnresolved) { + if (log.isTraceEnabled) { + logTrace("failed to resolve bind address: " + loc.getHostname + ":" + + loc.getPort + ", so use default writer") + } + getNewHFileWriter(family, + conf, + null, + fs, + familyDir, + familyHFileWriteOptionsMapInternal, + hfileCompression) + } else { + if(log.isDebugEnabled) { + logDebug("use favored nodes writer: " + initialIsa.getHostString) + } + getNewHFileWriter(family, + conf, + Array[InetSocketAddress](initialIsa), + fs, + familyDir, + familyHFileWriteOptionsMapInternal, + hfileCompression) + } + } + }) + + val keyValue =new KeyValue(rowKey, + family, + qualifier, + nowTimeStamp,cellValue) + + wl.writer.append(keyValue) + wl.written += keyValue.getLength + + wl + } + + /** + * This will roll all Writers + * @param writerMap HashMap that contains all the writers + * @param regionSplitPartitioner The partitioner with knowledge of how the + * Region's are split by row key + * @param previousRow The last row to fill the HFile ending range metadata + * @param compactionExclude The exclude compaction metadata flag for the HFile + */ + private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength], + regionSplitPartitioner: BulkLoadPartitioner, + previousRow: Array[Byte], + compactionExclude: Boolean): Unit = { + writerMap.values.foreach( wl => { + if (wl.writer != null) { + logDebug("Writer=" + wl.writer.getPath + + (if (wl.written == 0) "" else ", wrote=" + wl.written)) + closeHFileWriter(wl.writer, + regionSplitPartitioner, + previousRow, + compactionExclude) + } + }) + writerMap.clear() + + } + + /** + * Function to close an HFile + * @param w HFile Writer + * @param regionSplitPartitioner The partitioner with knowledge of how the + * Region's are split by row key + * @param previousRow The last row to fill the HFile ending range metadata + * @param compactionExclude The exclude compaction metadata flag for the HFile + */ + private def closeHFileWriter(w: StoreFile.Writer, + regionSplitPartitioner: BulkLoadPartitioner, + previousRow: Array[Byte], + compactionExclude: Boolean): Unit = { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())) + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)) + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)) + w.appendTrackedTimestampsToMetadata() + w.close() } } + + /** + * This is a wrapper class around StoreFile.Writer. The reason for the + * wrapper is to keep the length of the file along side the writer + * + * @param written The writer to be wrapped + * @param writer The number of bytes written to the writer + */ + class WriterLength(var written:Long, val writer:StoreFile.Writer) } object LatestHBaseContextCache { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala index 7c59145..601642a 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.spark import java.util -import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.{HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD -import scala.collection.immutable.HashMap import scala.reflect.ClassTag /** @@ -164,8 +162,8 @@ object HBaseRDDFunctions } /** - * Implicit method that gives easy access to HBaseContext's - * bulkLoad method. + * Spark Implementation of HBase Bulk load for wide rows or when + * values are not already combined at the time of the map process * * A Spark Implementation of HBase Bulk load * @@ -203,5 +201,51 @@ object HBaseRDDFunctions flatMap, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize) } + + /** + * Implicit method that gives easy access to HBaseContext's + * bulkLoadThinRows method. + * + * Spark Implementation of HBase Bulk load for short rows some where less then + * a 1000 columns. This bulk load should be faster for tables will thinner + * rows then the other spark implementation of bulk load that puts only one + * value into a record going into a shuffle + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * In this implementation only the rowKey is given to the shuffle as the key + * and all the columns are already linked to the RowKey before the shuffle + * stage. The sorting of the qualifier is done in memory out side of the + * shuffle stage + * + * @param tableName The HBase table we are loading into + * @param mapFunction A function that will convert the RDD records to + * the key value format used for the shuffle to prep + * for writing to the bulk loaded HFiles + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + */ + def hbaseBulkLoadThinRows(hc: HBaseContext, + tableName: TableName, + mapFunction: (T) => + (ByteArrayWrapper, FamiliesQualifiersValues), + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { + hc.bulkLoadThinRows(rdd, tableName, + mapFunction, stagingDir, familyHFileWriteOptionsMap, + compactionExclude, maxSize) + } } } diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index 2e5381a..795ce6d 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -65,8 +65,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { sc.stop() } - test("Basic Test multi family and multi column tests " + - "with all default HFile Configs") { + test("Wide Row Bulk Load: Test multi family and multi column tests " + + "with all default HFile Configs.") { val config = TEST_UTIL.getConfiguration logInfo(" - creating table " + tableName) @@ -81,36 +81,38 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { // 5. There are records will a single qualifier and some with two val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))), (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))) + + val hbaseContext = new HBaseContext(sc, config) testFolder.create() val stagingFolder = testFolder.newFolder() - hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd, TableName.valueOf(tableName), t => { val rowKey = t._1 - val family:Array[Byte] = t._2(0)._1 - val qualifier = t._2(0)._2 - val value = t._2(0)._3 + val family:Array[Byte] = t._2._1 + val qualifier = t._2._2 + val value:Array[Byte] = t._2._3 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) @@ -188,7 +190,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { } } - test("bulkLoad to test HBase client: Test Roll Over and " + + test("Wide Row Bulk Load: Test HBase client: Test Roll Over and " + "using an implicit call to bulk load") { val config = TEST_UTIL.getConfiguration @@ -204,23 +206,23 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { // 5. There are records will a single qualifier and some with two val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))), (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))) val hbaseContext = new HBaseContext(sc, config) @@ -231,9 +233,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { TableName.valueOf(tableName), t => { val rowKey = t._1 - val family:Array[Byte] = t._2(0)._1 - val qualifier = t._2(0)._2 - val value = t._2(0)._3 + val family:Array[Byte] = t._2._1 + val qualifier = t._2._2 + val value = t._2._3 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) @@ -314,7 +316,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { } } - test("Basic Test multi family and multi column tests" + + test("Wide Row Bulk Load: Test multi family and multi column tests" + " with one column family with custom configs plus multi region") { val config = TEST_UTIL.getConfiguration @@ -335,23 +337,23 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { // 5. There are records will a single qualifier and some with two val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))), (Bytes.toBytes("3"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))), (Bytes.toBytes("5"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), (Bytes.toBytes("4"), - Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), (Bytes.toBytes("2"), - Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))) val hbaseContext = new HBaseContext(sc, config) @@ -365,13 +367,13 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options) - hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd, TableName.valueOf(tableName), t => { val rowKey = t._1 - val family:Array[Byte] = t._2(0)._1 - val qualifier = t._2(0)._2 - val value = t._2(0)._3 + val family:Array[Byte] = t._2._1 + val qualifier = t._2._2 + val value = t._2._3 val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) @@ -473,7 +475,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { } } - test("bulkLoad partitioner tests") { + test("Test partitioner") { var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3) splitKeys(0) = Bytes.toBytes("") @@ -530,8 +532,425 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(5 == partitioner.getPartition(Bytes.toBytes("11"))) assert(6 == partitioner.getPartition(Bytes.toBytes("12"))) assert(6 == partitioner.getPartition(Bytes.toBytes("13"))) + } + + test("Thin Row Bulk Load: Test multi family and multi column tests " + + "with all default HFile Configs") { + val config = TEST_UTIL.getConfiguration + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2))) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + ("1", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + ("3", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))), + ("3", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))), + ("5", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), + ("4", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), + ("4", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))). + groupByKey() + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + t => { + val rowKey = Bytes.toBytes(t._1) + + val familyQualifiersValues = new FamiliesQualifiersValues + t._2.foreach(f => { + val family:Array[Byte] = f._1 + val qualifier = f._2 + val value:Array[Byte] = f._3 + + familyQualifiersValues +=(family, qualifier, value) + }) + (new ByteArrayWrapper(rowKey), familyQualifiersValues) + }, + stagingFolder.getPath) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2) + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table, + conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b")) + + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + + } + } + + test("Thin Row Bulk Load: Test HBase client: Test Roll Over and " + + "using an implicit call to bulk load") { + val config = TEST_UTIL.getConfiguration + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2))) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + ("1", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))), + ("5", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), + ("4", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), + ("4", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))). + groupByKey() + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + rdd.hbaseBulkLoadThinRows(hbaseContext, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + + val familyQualifiersValues = new FamiliesQualifiersValues + t._2.foreach(f => { + val family:Array[Byte] = f._1 + val qualifier = f._2 + val value:Array[Byte] = f._3 + + familyQualifiersValues +=(family, qualifier, value) + }) + (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) + }, + stagingFolder.getPath, + new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude = false, + 20) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1) + + assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5) + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c")) + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + } } + test("Thin Row Bulk Load: Test multi family and multi column tests" + + " with one column family with custom configs plus multi region") { + val config = TEST_UTIL.getConfiguration + + val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2) + splitKeys(0) = Bytes.toBytes("2") + splitKeys(1) = Bytes.toBytes("4") + + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), + Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)), + splitKeys) + + //There are a number of tests in here. + // 1. Row keys are not in order + // 2. Qualifiers are not in order + // 3. Column Families are not in order + // 4. There are tests for records in one column family and some in two column families + // 5. There are records will a single qualifier and some with two + val rdd = sc.parallelize(Array( + ("1", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + ("3", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))), + ("3", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))), + ("5", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), + ("4", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), + ("4", + (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), + ("2", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))). + groupByKey() + + val hbaseContext = new HBaseContext(sc, config) + + testFolder.create() + val stagingFolder = testFolder.newFolder() + + val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] + + val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, + "PREFIX") + + familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options) + + hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + + val familyQualifiersValues = new FamiliesQualifiersValues + t._2.foreach(f => { + val family:Array[Byte] = f._1 + val qualifier = f._2 + val value:Array[Byte] = f._3 + + familyQualifiersValues +=(family, qualifier, value) + }) + (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) + }, + stagingFolder.getPath, + familyHBaseWriterOptions, + compactionExclude = false, + HConstants.DEFAULT_MAX_FILE_SIZE) + + val fs = FileSystem.get(config) + assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2) + + val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1")) + for ( i <- 0 until f1FileList.length) { + val reader = HFile.createReader(fs, f1FileList(i).getPath, + new CacheConfig(config), config) + assert(reader.getCompressionAlgorithm.getName.equals("gz")) + assert(reader.getDataBlockEncoding.name().equals("PREFIX")) + } + + assert( 3 == f1FileList.length) + + val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2")) + for ( i <- 0 until f2FileList.length) { + val reader = HFile.createReader(fs, f2FileList(i).getPath, + new CacheConfig(config), config) + assert(reader.getCompressionAlgorithm.getName.equals("none")) + assert(reader.getDataBlockEncoding.name().equals("NONE")) + } + + assert( 2 == f2FileList.length) + + + val conn = ConnectionFactory.createConnection(config) + + val load = new LoadIncrementalHFiles(config) + val table = conn.getTable(TableName.valueOf(tableName)) + try { + load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) + + val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells() + assert(cells5.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3")) + assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a")) + val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells() + assert(cells4.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b")) + + val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells() + assert(cells3.size == 3) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a")) + assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b")) + + + val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells() + assert(cells2.size == 2) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a")) + assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2")) + assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b")) + + val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells() + assert(cells1.size == 1) + assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1")) + assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1")) + assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a")) + + } finally { + table.close() + val admin = ConnectionFactory.createConnection(config).getAdmin + try { + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } finally { + admin.close() + } + fs.delete(new Path(stagingFolder.getPath), true) + + testFolder.delete() + + } + } }