Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5356

Write to Hbase from Spark

    XMLWordPrintableJSON

Details

    • Question
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 1.1.0
    • None
    • Examples, Spark Shell
    • Linux

    Description

      I am able to Read in Hbase from Spark, but I am not able to write rows in Hbase from Spark.
      I am on Cloudera 5.0 (Spark 1.1.0 and HBase 0.98.6) . So Far this is what I got.

      I have a RDD localData, how can save that to Hbase, how can I use saveAsHadoopDataset?
      import org.apache.hadoop.hbase.

      {HBaseConfiguration, HTableDescriptor}

      import org.apache.hadoop.hbase.mapreduce.TableInputFormat
      import org.apache.spark.rdd.NewHadoopRDD
      import org.apache.hadoop.hbase.io.ImmutableBytesWritable
      import org.apache.hadoop.hbase.client.Result
      import org.apache.hadoop.hbase.mapred.TableOutputFormat
      import org.apache.hadoop.mapred.JobConf
      //Create RDD
      val localData = sc.textFile("/home/hbase_example/antiwari/scala_code/resources/scala_load_file.txt")
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", "localhost")
      conf.set("hbase.zookeeper.property.clientPort","2181")
      val jobConfig: JobConf = new JobConf(conf, this.getClass)
      jobConfig.setOutputFormat(classOf[TableOutputFormat])
      jobConfig.set(TableOutputFormat.OUTPUT_TABLE, "spark_data")
      /*Contents of scala_load_file.txt
      0000000001, Name01, Field1
      0000000002, Name02, Field2
      0000000003, Name03, Field3
      0000000004, Name04, Field4
      /*

      I looked at many examples online including (http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_hbase_import.html... , i get the following error (may be because I am on spark 1.1.0 and this example is old)

      scala> def convert(triple: (Int, String, String)) =

      { | val p = new Put(Bytes.toBytes(triple._1)) | p.add(Bytes.toBytes("cf"), | Bytes.toBytes("col_1"), Bytes.toBytes(triple._2)) | p.add(Bytes.toBytes("cf"), | Bytes.toBytes("col_2"), Bytes.toBytes(triple._3)) | (new ImmutableBytesWritable, p) | }

      <console>:18: error: not found: type Put
      val p = new Put(Bytes.toBytes(triple._1))

      Attachments

        Activity

          People

            Unassigned Unassigned
            aniruddh02 Ani
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: