Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4586

NumberSequenceIterator and Accumulator threading issue

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.1.2
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: DataSet API
    • Labels:
      None

      Description

      There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator.

      It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solutions.

      The following scala snippit exemplifies the problem.

      Instead of printing the correct average, the result should be 50.5 but is something completely different, like 8.08, dependent on the number of cores used.

      If the parallelism is set to 1 the result is correct, which indicates a likely threading problem.

      The problem occurs using the java and scala API.

      env
        .fromParallelCollection(new NumberSequenceIterator(1, 100))
        .map(new RichMapFunction[Long, Long] {
      	var a : AverageAccumulator = _
      
      	override def map(value: Long): Long = {
      	  a.add(value)
      	  value
      	}
      
      	override def open(parameters: Configuration): Unit = {
      	  a = new AverageAccumulator
      	  getRuntimeContext.addAccumulator("test", a)
      	}
        })
        .reduce((a, b) => a + b)
        .print()
      
      
      val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
      
      println(lastJobExecutionResult.getAccumulatorResult("test"))
      

        Issue Links

          Activity

          Hide
          jkirsch Johannes added a comment -

          Complete Scala Testcase

          Show
          jkirsch Johannes added a comment - Complete Scala Testcase
          Hide
          jkirsch Johannes added a comment -

          The testcase also fails, when just using a plain collection and a rebalance to create some form of a parallel collection

          fromCollection(1 to 100).rebalance() 
          

          So it seems to be not specific to the NumberSequenceIterator.

          So either the initialization of the accumulator is wrong in the sample code, or there is a deeper issue.

          Show
          jkirsch Johannes added a comment - The testcase also fails, when just using a plain collection and a rebalance to create some form of a parallel collection fromCollection(1 to 100).rebalance() So it seems to be not specific to the NumberSequenceIterator. So either the initialization of the accumulator is wrong in the sample code, or there is a deeper issue.
          Hide
          jkirsch Johannes added a comment -

          Scala unit test

          Show
          jkirsch Johannes added a comment - Scala unit test
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user greghogan opened a pull request:

          https://github.com/apache/flink/pull/2639

          FLINK-4586 [core] Broken AverageAccumulator

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/greghogan/flink 4586_broken_averageaccumulator

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2639.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2639


          commit 58d79b40ed837542e727c5f4f3a410ccb5f9ff24
          Author: Greg Hogan <code@greghogan.com>
          Date: 2016-10-14T20:18:52Z

          FLINK-4586 [core] Broken AverageAccumulator


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2639 FLINK-4586 [core] Broken AverageAccumulator You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4586_broken_averageaccumulator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2639 commit 58d79b40ed837542e727c5f4f3a410ccb5f9ff24 Author: Greg Hogan <code@greghogan.com> Date: 2016-10-14T20:18:52Z FLINK-4586 [core] Broken AverageAccumulator
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2639#discussion_r83643344

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java —
          @@ -28,51 +28,52 @@
          public class AverageAccumulator implements SimpleAccumulator<Double> {

          private static final long serialVersionUID = 3672555084179165255L;

          • private double localValue;
            +
            private long count;

          + private double sum;
          +
          @Override
          public void add(Double value)

          { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(long value)

          { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; }

          @Override
          public Double getLocalValue() {
          if (this.count == 0)

          { return 0.0; }
          • return this.localValue / (double)this.count;
            + return this.sum / this.count;
            }

          @Override
          public void resetLocal()

          { this.count = 0; - this.localValue = 0; + this.sum = 0; }

          @Override
          public void merge(Accumulator<Double, Double> other) {
          if (other instanceof AverageAccumulator) {

          • AverageAccumulator temp = (AverageAccumulator)other;
          • this.count += temp.count;
          • this.localValue += other.getLocalValue();
              • End diff –

          I guess this was the buggy line?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2639#discussion_r83643344 — Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java — @@ -28,51 +28,52 @@ public class AverageAccumulator implements SimpleAccumulator<Double> { private static final long serialVersionUID = 3672555084179165255L; private double localValue; + private long count; + private double sum; + @Override public void add(Double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(long value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; } @Override public Double getLocalValue() { if (this.count == 0) { return 0.0; } return this.localValue / (double)this.count; + return this.sum / this.count; } @Override public void resetLocal() { this.count = 0; - this.localValue = 0; + this.sum = 0; } @Override public void merge(Accumulator<Double, Double> other) { if (other instanceof AverageAccumulator) { AverageAccumulator temp = (AverageAccumulator)other; this.count += temp.count; this.localValue += other.getLocalValue(); End diff – I guess this was the buggy line?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2639#discussion_r84054277

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java —
          @@ -28,51 +28,52 @@
          public class AverageAccumulator implements SimpleAccumulator<Double> {

          private static final long serialVersionUID = 3672555084179165255L;

          • private double localValue;
            +
            private long count;

          + private double sum;
          +
          @Override
          public void add(Double value)

          { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(long value)

          { this.count++; - this.localValue += value; + this.sum += value; }

          public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; }

          @Override
          public Double getLocalValue() {
          if (this.count == 0)

          { return 0.0; }
          • return this.localValue / (double)this.count;
            + return this.sum / this.count;
            }

          @Override
          public void resetLocal()

          { this.count = 0; - this.localValue = 0; + this.sum = 0; }

          @Override
          public void merge(Accumulator<Double, Double> other) {
          if (other instanceof AverageAccumulator) {

          • AverageAccumulator temp = (AverageAccumulator)other;
          • this.count += temp.count;
          • this.localValue += other.getLocalValue();
              • End diff –

          Yes, and the test did not catch the bug since it was only merging the average of two single values (where the sum is the average).

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2639#discussion_r84054277 — Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java — @@ -28,51 +28,52 @@ public class AverageAccumulator implements SimpleAccumulator<Double> { private static final long serialVersionUID = 3672555084179165255L; private double localValue; + private long count; + private double sum; + @Override public void add(Double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(double value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(long value) { this.count++; - this.localValue += value; + this.sum += value; } public void add(int value) { this.count++; - this.localValue += value; + this.sum += value; } @Override public Double getLocalValue() { if (this.count == 0) { return 0.0; } return this.localValue / (double)this.count; + return this.sum / this.count; } @Override public void resetLocal() { this.count = 0; - this.localValue = 0; + this.sum = 0; } @Override public void merge(Accumulator<Double, Double> other) { if (other instanceof AverageAccumulator) { AverageAccumulator temp = (AverageAccumulator)other; this.count += temp.count; this.localValue += other.getLocalValue(); End diff – Yes, and the test did not catch the bug since it was only merging the average of two single values (where the sum is the average).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2639

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2639
          Hide
          greghogan Greg Hogan added a comment -

          Fixed in
          1.2.0: 428419d599d138f1647f84807d6d0224652f3d1b
          1.1.4: 9c87f92cbb9989281024c5c300cd18b962ac4357

          Show
          greghogan Greg Hogan added a comment - Fixed in 1.2.0: 428419d599d138f1647f84807d6d0224652f3d1b 1.1.4: 9c87f92cbb9989281024c5c300cd18b962ac4357

            People

            • Assignee:
              greghogan Greg Hogan
              Reporter:
              jkirsch Johannes
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development