Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
0.6.0
-
None
Description
I have a code snippet as follow:
Read.Bounded<KV<LongWritable, Text>> from = Read.from(HDFSFileSource.from(options.getInputFile(), TextInputFormat.class, LongWritable.class, Text.class)); PCollection<KV<LongWritable, Text>> data = p.apply(from); data.apply(MapElements.via(new SimpleFunction<KV<LongWritable, Text>, String>() { @Override public String apply(KV<LongWritable, Text> input) { return input.getValue() + "\t" + input.getValue(); } })).apply(Write.to(HDFSFileSink.<String>toText(options.getOutputFile())));
and submit job like this:
spark-submit --class org.apache.beam.examples.WordCountHDFS --master yarn-client \ ./target/word-count-beam-bundled-0.1.jar \ --runner=SparkRunner \ --inputFile=hdfs://master/tmp/input/ \ --outputFile=/tmp/output/
Then HDFSFileSink.validate function will check whether the local filesystem (not HDFS) exists /tmp/output/ directory.
But the final result will store in hdfs://master/tmp/output/ directory in HDFS filesystem.
The reason is HDFSFileSink class do not use the same configuration in master thread and slave thread.
Attachments
Issue Links
- links to