Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
1.0.0
-
None
Description
When trying to output a DataSet as a Sequence file, using the Scala API, an Exception is thrown when anything other than a LongWritable is supplied as a key. The following simple unit test demonstrates this:
test("Simple DataSet with IntWritable Key and Value") { val path = TmpDir + "flinkOutput" implicit val typeInfo = createTypeInformation[(Int,Int)] val ds = env.fromElements[(Int, Int)]((1,2), (3,4), (5,6), (7,8)) val writableDataset : DataSet[(IntWritable, IntWritable)] = ds.map( tuple => (new IntWritable(tuple._1.asInstanceOf[Int]), new IntWritable(tuple._2.asInstanceOf[Int])) ) val job: Job = new Job() // setup sink for IntWritable val sequenceFormat = new SequenceFileOutputFormat[IntWritable, IntWritable] val hadoopOutput = new HadoopOutputFormat[IntWritable, IntWritable](sequenceFormat, job) FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) writableDataset.output(hadoopOutput) env.execute(s"dfsWrite($path)") }
The above code will throw the following exception:
{...} 1 [DataSink (org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat@4d518b32) (1/1)] ERROR org.apache.flink.runtime.operators.DataSinkTask - Error in user code: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable: DataSink (org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat@4d518b32) (1/1) java.io.IOException: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1305) at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:83) at org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat.writeRecord(HadoopOutputFormat.scala:30) at org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat.writeRecord(HadoopOutputFormat.scala:26) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:200) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745)
When trying to write a DataSet to a Hadoop Sequence File, the API seems to always expect a Key of type LongWritable and a Value of Type Text. Similar exceptions are thrown when other Writables are used as keys.