Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
The bulkLoad methods of class org.apache.hadoop.hbase.spark.HBaseContext use the system's current time for the version of the cells to bulk-load. This makes this method, and its twin bulkLoadThinRows, useless if you need to use your own versionning system:
//Here is where we finally iterate through the data in this partition of the //RDD that has been sorted and partitioned val wl = writeValueToHFile( keyFamilyQualifier.rowKey, keyFamilyQualifier.family, keyFamilyQualifier.qualifier, cellValue, nowTimeStamp, fs, conn, localTableName, conf, familyHFileWriteOptionsMapInternal, hfileCompression, writerMap, stagingDir )
Thus, I propose a third bulkLoad method, based on the original method. Instead of using an Iterator(KeyFamilyQualifier, Array[Byte]) as the basis for the writes, this new method would use an Iterator(KeyFamilyQualifier, Array[Byte], Long), with the Long being the version.
Definition of bulkLoad:
def bulkLoad[T]( rdd:RDD[T], tableName: TableName, flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], stagingDir:String, familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Definition of a bulkLoadWithCustomVersions method:
def bulkLoadCustomVersions[T](rdd:RDD[T], tableName: TableName, flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], Long)], stagingDir:String, familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
In case of illogical version (for instance, a negative version), the method would throw back to the current timestamp.
val wl = writeValueToHFile(keyFamilyQualifier.rowKey, keyFamilyQualifier.family, keyFamilyQualifier.qualifier, cellValue, if (version > 0) version else nowTimeStamp, fs, conn, localTableName, conf, familyHFileWriteOptionsMapInternal, hfileCompression, writerMap, stagingDir)
See the attached file for the file with the full proposed method.
Edit:
The same could be done with bulkLoadThinRows: instead of a:
Iterator[Pair[ByteArrayWrapper, FamiliesQualifiersValues]]
We expect an:
Iterator[Triple[ByteArrayWrapper, FamiliesQualifiersValues, Long]]
Attachments
Attachments
Issue Links
- links to