Uploaded image for project: 'HBase'
  1. HBase
  2. HBASE-20748

HBaseContext bulkLoad: being able to use custom versions

    XMLWordPrintableJSON

Details

    • Hide
      PR#78: submitted addition of bulkLoadWithCustomVersions method to org.apache.hadoop.hbase.spark.HBaseContext.scala.

      This will allow the use of a bulkLoad with a custom version Long into HBase
      Show
      PR#78: submitted addition of bulkLoadWithCustomVersions method to org.apache.hadoop.hbase.spark.HBaseContext.scala. This will allow the use of a bulkLoad with a custom version Long into HBase

    Description

      The bulkLoad methods of class org.apache.hadoop.hbase.spark.HBaseContext use the system's current time for the version of the cells to bulk-load. This makes this method, and its twin bulkLoadThinRows, useless if you need to use your own versionning system:

      //Here is where we finally iterate through the data in this partition of the 
      //RDD that has been sorted and partitioned
      val wl = writeValueToHFile(
        keyFamilyQualifier.rowKey, 
        keyFamilyQualifier.family, 
        keyFamilyQualifier.qualifier, 
        cellValue, 
        nowTimeStamp, 
        fs, 
        conn, 
        localTableName, 
        conf, 
        familyHFileWriteOptionsMapInternal, 
        hfileCompression, 
        writerMap, 
        stagingDir
      )

       

      Thus, I propose a third bulkLoad method, based on the original method. Instead of using an Iterator(KeyFamilyQualifier, Array[Byte]) as the basis for the writes, this new method would use an Iterator(KeyFamilyQualifier, Array[Byte], Long), with the Long being the version.

       

      Definition of bulkLoad:

      def bulkLoad[T](
      rdd:RDD[T], 
      tableName: TableName, 
      flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], 
      stagingDir:String, 
      familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude: Boolean = false, 
      maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):

      Definition of a bulkLoadWithCustomVersions method:

      def bulkLoadCustomVersions[T](rdd:RDD[T],
                        tableName: TableName,
                        flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte], Long)],
                        stagingDir:String,
                        familyHFileWriteOptionsMap:
                        util.Map[Array[Byte], FamilyHFileWriteOptions] =
                        new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                        compactionExclude: Boolean = false,
                        maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):

      In case of illogical version (for instance, a negative version), the method would throw back to the current timestamp.

      val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
                    keyFamilyQualifier.family,
                    keyFamilyQualifier.qualifier,
                    cellValue,
                    if (version > 0) version else nowTimeStamp,
                    fs,
                    conn,
                    localTableName,
                    conf,
                    familyHFileWriteOptionsMapInternal,
                    hfileCompression,
                    writerMap,
                    stagingDir)

      See the attached file for the file with the full proposed method.

       

      Edit:

      The same could be done with bulkLoadThinRows: instead of a:

      Iterator[Pair[ByteArrayWrapper, FamiliesQualifiersValues]]

      We expect an:

       Iterator[Triple[ByteArrayWrapper, FamiliesQualifiersValues, Long]]

      Attachments

        1. bulkLoadCustomVersions.scala
          6 kB
          Charles PORROT

        Issue Links

          Activity

            People

              charlesPorrot Charles PORROT
              charlesPorrot Charles PORROT
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: