Details
-
Bug
-
Status: Open
-
P1
-
Resolution: Unresolved
-
2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0, 2.21.0, 2.22.0
-
None
-
Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
-
Important
Description
FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping records. This is with a very small test dataset - 6 records, which should produce 3 directories.
Pipeline p = Pipeline.create(options); PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv")) .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn())); //write out into AVRO in each separate directory records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC))) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION))); p.run().waitUntilFinish();
If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently.
e.g.
// Initialise pipeline Pipeline p = Pipeline.create(options); PCollection<KV<String, String>> records = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn())); //write out into AVRO in each separate directory records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic() .by(KV::getKey) .via(Contextful.fn(KV::getValue), TextIO.sink()) .to(options.getTargetPath()) .withDestinationCoder(StringUtf8Coder.of()) .withNaming(datasetID -> defaultNaming(key + "/export", ".csv")); p.run().waitUntilFinish();