Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11282

Cannot set compression level when writing compressed files

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Triage Needed
    • Priority: P2
    • Resolution: Unresolved
    • Affects Version/s: 2.25.0
    • Fix Version/s: None
    • Component/s: sdk-py-core
    • Labels:

      Description

      CompressedFile._initialize_compressor hardcodes the compression level used when writing:

       
      self._compressor = zlib.compressobj(
                zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, self._gzip_mask)
       
      It would be good to be able to control this, as I have a large set of GZIP compressed files that are creating output 10x larger then the input size when writing the same data back.
       
      I've tried various monkeypatching approaches: these seem to work with the local runner, but failed when using DataflowRunner. For example:
       
      class WriteData(beam.PTransform):
          def _init_(self, dst):
              import zlib

              self._dst = dst

              def _initialize_compressor(self):
                  self._compressor = zlib.compressobj(
                      zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, self._gzip_mask
                  )

              CompressedFile._initialize_compressor = _initialize_compressor

          def expand(self, p):
              return p | WriteToText(
                  file_path_prefix=self._dst,
                  file_name_suffix=".tsv.gz",
                  compression_type="gzip",
              )

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              JackWhelpton Jack Whelpton
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: