Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-7508

Avoid converting iterator to list HoodieStreamerUtils.createHoodieRecords

    XMLWordPrintableJSON

Details

    • 4

    Description

      This block of code is problematic and can lead to OOM when we are we converting the iterator into a list and then returning the iterator back. This just holds up memory in the heap when the executor is running this block of code.
      https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86

      records = avroRDD.mapPartitions(
      (FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) genericRecordIterator -> {
      if (autoGenerateRecordKeys)

      {{

      { props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); }

      }}

      BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
      List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
      while (genericRecordIterator.hasNext()) {
      GenericRecord genRec = genericRecordIterator.next();
      try

      {{

      { HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) : DataSourceUtils.createPayload(cfg.payloadClassName, gr); avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); }

      }}

      catch (Exception e) {
      if (!shouldErrorTable)

      {{

      { throw e; }

      }}

      avroRecords.add(generateErrorRecord(genRec));
      }
      }
      return avroRecords.iterator();
      });
       
       

      Attachments

        Issue Links

          Activity

            People

              vinish_jail97 Vinish Reddy
              vinish_jail97 Vinish Reddy
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: