Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12279

Implement destination-dependent sharding in FileIO.writeDynamic

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Triage Needed
    • Priority: P1
    • Resolution: Unresolved
    • Affects Version/s: 2.28.0
    • Fix Version/s: None
    • Labels:
      None

      Description

      Destination dependent sharding feature is very much needed in order to maintain manageable files sizes and file counts in google storage especially when data volumes are very large.

      Current implementation doesn't allow that (per documentation ) https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html

       

      Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified via FileIO.Write.withNumShards(int) or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>).

       

      **We use it as follows and end up with either very small or very large files per destination in the same window. Large files are not possible to open/process and small files clutter the bucket.

      Pipeline pipeline = Pipeline.create(options);
       pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
       .apply(options.getWindowDuration() + " Window",
       Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
       .triggering(AfterWatermark.pastEndOfWindow()) 
       .discardingFiredPanes()
       .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
      
      .apply(FileIO.<String,PubsubMessage>writeDynamic()
       .by(new datePartition(options.getOutputFilenamePrefix(), options.getTimestampName()))
       .via(Contextful.fn(
       (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
       TextIO.sink())
       .withDestinationCoder(StringUtf8Coder.of())
       .to(options.getOutputDirectory())
       .withNaming(type -> new CrowdStrikeFileNaming(type))
       .withNumShards(options.getNumShards())
       .withTempDirectory(options.getTempLocation()));
      
      pipeline.run();
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              inessa@cyber.nyc.gov Inessa Yakubov
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: