Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.2.0
-
None
-
None
Description
I tried to transform Hbase export (sequence file) using spark job, and face a compilation issue:
val hc = sc.hadoopConfiguration val serializers = List( classOf[WritableSerialization].getName, classOf[ResultSerialization].getName ).mkString(",") hc.set("io.serializations", serializers) val c = new Configuration(sc.hadoopConfiguration) c.set("mapred.input.dir", sourcePath) val subsetRDD = sc.newAPIHadoopRDD( c, classOf[SequenceFileInputFormat[ImmutableBytesWritable, Result]], classOf[ImmutableBytesWritable], classOf[Result]) subsetRDD.saveAsNewAPIHadoopFile( "output/sequence", classOf[ImmutableBytesWritable], classOf[Result], classOf[SequenceFileOutputFormat[ImmutableBytesWritable, Result]], hc )
During compilation I received:
Error: type mismatch Class[org.apache.hadoop.mapred.SequenceFileOutputFormat[org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result]](classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat]) required: Class[_ <: org.apache.hadoop.mapreduce.OutputFormat[_, _]] classOf[SequenceFileOutputFormat[ImmutableBytesWritable, Result]],
By using Hadoop low-level api I could workaround the issue with following:
val writer = SequenceFile.createWriter(hc, Writer.file(new Path(“sample")), Writer.keyClass(classOf[ImmutableBytesWritable]), Writer.valueClass(classOf[Result]), Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)), Writer.replication(fs.getDefaultReplication()), Writer.blockSize(1073741824), Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()), Writer.progressable(null), Writer.metadata(new Metadata())) subset.foreach(p => writer.append(p._1, p._2)) IOUtils.closeStream(writer)
I think that the interface is too restrictive, and does not allow to pass external (de)serializers