Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25405

Saving RDD with new Hadoop API file as a Sequence File too restrictive

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.2.0
    • None
    • Input/Output
    • None

    Description

      I tried to transform Hbase export (sequence file) using spark job, and face a compilation issue:

       

      
      val hc = sc.hadoopConfiguration
      
      val serializers = List(
        classOf[WritableSerialization].getName,
        classOf[ResultSerialization].getName
      ).mkString(",")
      
      hc.set("io.serializations", serializers)
      
      val c = new Configuration(sc.hadoopConfiguration)
      c.set("mapred.input.dir", sourcePath)
      val subsetRDD = sc.newAPIHadoopRDD(
        c,
        classOf[SequenceFileInputFormat[ImmutableBytesWritable, Result]],
        classOf[ImmutableBytesWritable],
        classOf[Result])
      
      subsetRDD.saveAsNewAPIHadoopFile(
        "output/sequence",
        classOf[ImmutableBytesWritable],
        classOf[Result],
        classOf[SequenceFileOutputFormat[ImmutableBytesWritable, Result]],
        hc
      )
      

       

       

      During compilation I received:

      Error: type mismatch
      Class[org.apache.hadoop.mapred.SequenceFileOutputFormat[org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result]](classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat]) 
      
      required: Class[_ <: org.apache.hadoop.mapreduce.OutputFormat[_, _]] classOf[SequenceFileOutputFormat[ImmutableBytesWritable, Result]],

       

      By using Hadoop low-level api I could workaround the issue with following:

      val writer = SequenceFile.createWriter(hc, Writer.file(new Path(“sample")),
        Writer.keyClass(classOf[ImmutableBytesWritable]),
        Writer.valueClass(classOf[Result]),
        Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
        Writer.replication(fs.getDefaultReplication()),
        Writer.blockSize(1073741824),
        Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
        Writer.progressable(null),
        Writer.metadata(new Metadata()))
      
      subset.foreach(p => writer.append(p._1, p._2))
      
      IOUtils.closeStream(writer)
      

       

      I think that the interface is too restrictive, and does not allow to pass external (de)serializers

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            marcin.gasior@gmail.com Marcin Gasior
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: