Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6859

Spark-runner: DoFn tearDown function will not be invoked if there is no data in the batch

Details

    • Bug
    • Status: Resolved
    • P1
    • Resolution: Fixed
    • 2.8.0, 2.9.0, 2.10.0, 2.11.0
    • 2.13.0
    • runner-spark
    • None

    Description

      In the implementation of MultiDoFnFunction

      @Override
      public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
      throws Exception {
        if (!wasSetupCalled) {
          DoFnInvokers.invokerFor(doFn).invokeSetup();
          wasSetupCalled = true;
        }
        ...
      
         return new SparkProcessContext<>(
      	doFn,
      	doFnRunnerWithMetrics,
      	outputManager,
      	stateful ? new TimerDataIterator(timerInternals) : Collections.emptyIterator())
      	.processPartition(iter)
      	.iterator();
      }

      It will call setup function of a DoFn every batch in spark streaming.  And the tearDown function of DoFn will invoked by SparkProcessContext instance. But in the implementation of SparkProcessContext.processParition(), if the partition is empty, it will return an empty ArrayList instance directly. If there is no data in the batch, it means the tearDown function of DoFn will not be invoked for it is invoked in the ProcCtxtIterator instance which created only when there are data (parition.hasNext() == true).

      Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) throws Exception {
      	
      	// skip if partition is empty.
      	if (!partition.hasNext()) {
      	    return new ArrayList<>();
      	}
      	
      	// process the partition; finishBundle() is called from within the output iterator.
      	return this.getOutputIterable(partition, doFnRunner);
      	}
      
      

      If you want to reproduce the issue, just build a pipeline to read from KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark streaming application, don't send any data to the kafka topic. Thread count of kafka producer will keep increasing and OOO at the end.

       

       

      Attachments

        Issue Links

          Activity

            People

              michel Michael Luckey
              roncai Ron Cai
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1.5h
                  1.5h