Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22588

SPARK: Load Data from Dataframe or RDD to DynamoDB / dealing with null values

    Details

    • Type: Question
    • Status: Resolved
    • Priority: Minor
    • Resolution: Invalid
    • Affects Version/s: 2.1.1
    • Fix Version/s: None
    • Component/s: Deploy
    • Labels:

      Description

      I am using spark 2.1 on EMR and i have a dataframe like this:

      ClientNum | Value_1 | Value_2 | Value_3 | Value_4
      14 | A | B | C | null
      19 | X | Y | null | null
      21 | R | null | null | null
      I want to load data into DynamoDB table with ClientNum as key fetching:

      Analyze Your Data on Amazon DynamoDB with apche Spark11

      Using Spark SQL for ETL3

      here is my code that I tried to solve:

      var jobConf = new JobConf(sc.hadoopConfiguration)
      jobConf.set("dynamodb.servicename", "dynamodb")
      jobConf.set("dynamodb.input.tableName", "table_name")
      jobConf.set("dynamodb.output.tableName", "table_name")
      jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
      jobConf.set("dynamodb.regionid", "eu-west-1")
      jobConf.set("dynamodb.throughput.read", "1")
      jobConf.set("dynamodb.throughput.read.percent", "1")
      jobConf.set("dynamodb.throughput.write", "1")
      jobConf.set("dynamodb.throughput.write.percent", "1")

      jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
      jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

      #Import Data
      val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
      I performed a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

      Create a new RDD with those types in it, in the following map call:

      #Convert the dataframe to rdd
      val df_rdd = df.rdd
      > df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at rdd at <console>:41

      #Print first rdd
      df_rdd.take(1)
      > res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null])

      var ddbInsertFormattedRDD = df_rdd.map(a =>

      { var ddbMap = new HashMap[String, AttributeValue]() var ClientNum = new AttributeValue() ClientNum.setN(a.get(0).toString) ddbMap.put("ClientNum", ClientNum) var Value_1 = new AttributeValue() Value_1.setS(a.get(1).toString) ddbMap.put("Value_1", Value_1) var Value_2 = new AttributeValue() Value_2.setS(a.get(2).toString) ddbMap.put("Value_2", Value_2) var Value_3 = new AttributeValue() Value_3.setS(a.get(3).toString) ddbMap.put("Value_3", Value_3) var Value_4 = new AttributeValue() Value_4.setS(a.get(4).toString) ddbMap.put("Value_4", Value_4) var item = new DynamoDBItemWritable() item.setItem(ddbMap) (new Text(""), item) }

      )
      This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:

      ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
      fails with the follwoing error:

      Caused by: java.lang.NullPointerException
      null values caused the error, if I try with ClientNum and Value_1 it works data is correctly inserted on DynamoDB table.

      Thanks for your help !!

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              iamsaanvi Saanvi Sharma
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 24h
                24h
                Remaining:
                Remaining Estimate - 24h
                24h
                Logged:
                Time Spent - Not Specified
                Not Specified